Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/objects/ObjectsPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface ObjectsPlugin {
*
* @param channelName the name of the channel whose state has changed.
* @param state the new state of the channel.
* @param hasObjects flag indicates whether the channel has any associated live objects.
* @param hasObjects flag indicates whether the channel has any associated objects.
*/
void handleStateChange(@NotNull String channelName, @NotNull ChannelState state, boolean hasObjects);

Expand Down
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/objects/RealtimeObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
/**
* The RealtimeObjects interface provides methods to interact with live data objects,
* such as maps and counters, in a real-time environment. It supports both synchronous
* and asynchronous operations for retrieving and creating live objects.
* and asynchronous operations for retrieving and creating objects.
*
* <p>Implementations of this interface must be thread-safe as they may be accessed
* from multiple threads concurrently.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

public interface ObjectsStateChange {
/**
* Subscribes to a specific Live Objects synchronization state event.
* Subscribes to a specific Objects synchronization state event.
*
* <p>This method registers the provided listener to be notified when the specified
* synchronization state event occurs. The returned subscription can be used to
Expand Down Expand Up @@ -40,7 +40,7 @@ public interface ObjectsStateChange {
void offAll();

/**
* Interface for receiving notifications about Live Objects synchronization state changes.
* Interface for receiving notifications about Objects synchronization state changes.
* <p>
* Implement this interface and register it with an ObjectsStateEmitter to be notified
* when synchronization state transitions occur.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.ably.lib.objects.state;

/**
* Represents the synchronization state of Ably Live Objects.
* Represents the synchronization state of Ably Objects.
* <p>
* This enum is used to notify listeners about state changes in the synchronization process.
* Clients can register an {@link ObjectsStateChange.Listener} to receive these events.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.ably.lib.objects.type;

import io.ably.lib.objects.ObjectsSubscription;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;

/**
* Interface for managing subscriptions to Object lifecycle events.
* <p>
* This interface provides methods to subscribe to and manage notifications about significant lifecycle
* changes that occur to Object, such as deletion. More events can be added in the future.
* Multiple listeners can be registered independently, and each can be managed separately.
* <p>
* Lifecycle events are different from data update events - they represent changes
* to the object's existence state rather than changes to the object's data content.
*
* @see ObjectLifecycleEvent for the available lifecycle events
*/
public interface ObjectLifecycleChange {
/**
* Subscribes to a specific Object lifecycle event.
*
* <p>This method registers the provided listener to be notified when the specified
* lifecycle event occurs. The returned subscription can be used to
* unsubscribe later when the notifications are no longer needed.
*
* @param event the lifecycle event to subscribe to
* @param listener the listener that will be called when the event occurs
* @return a subscription object that can be used to unsubscribe from the event
*/
@NonBlocking
ObjectsSubscription on(@NotNull ObjectLifecycleEvent event, @NotNull ObjectLifecycleChange.Listener listener);

/**
* Unsubscribes the specified listener from all lifecycle events.
*
* <p>After calling this method, the provided listener will no longer receive
* any lifecycle event notifications.
*
* @param listener the listener to unregister from all events
*/
@NonBlocking
void off(@NotNull ObjectLifecycleChange.Listener listener);

/**
* Unsubscribes all listeners from all lifecycle events.
*
* <p>After calling this method, no listeners will receive any lifecycle
* event notifications until new listeners are registered.
*/
@NonBlocking
void offAll();

/**
* Interface for receiving notifications about Object lifecycle changes.
* <p>
* Implement this interface and register it with an ObjectLifecycleChange provider
* to be notified when lifecycle events occur, such as object creation or deletion.
*/
@FunctionalInterface
interface Listener {
/**
* Called when a lifecycle event occurs.
*
* @param lifecycleEvent The lifecycle event that occurred
*/
void onLifecycleEvent(@NotNull ObjectLifecycleEvent lifecycleEvent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.ably.lib.objects.type;

/**
* Represents lifecycle events for an Ably Object.
* <p>
* This enum notifies listeners about significant lifecycle changes that occur to an Object during its lifetime.
* Clients can register a {@link ObjectLifecycleChange.Listener} to receive these events.
*/
public enum ObjectLifecycleEvent {
/**
* Indicates that an Object has been deleted (tombstoned).
* Emitted once when the object is tombstoned server-side (i.e., deleted and no longer addressable).
* Not re-emitted during client-side garbage collection of tombstones.
*/
DELETED
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/**
* Abstract base class for all LiveMap/LiveCounter update notifications.
* Provides common structure for updates that occur on LiveMap and LiveCounter objects.
* Contains the update data that describes what changed in the live object.
* Contains the update data that describes what changed in the object.
* Spec: RTLO4b4
*/
public abstract class ObjectUpdate {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.ably.lib.objects.type.counter;

import io.ably.lib.objects.ObjectsCallback;
import io.ably.lib.objects.type.ObjectLifecycleChange;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;
Expand All @@ -11,7 +12,7 @@
* It allows incrementing, decrementing, and retrieving the current value of the counter,
* both synchronously and asynchronously.
*/
public interface LiveCounter extends LiveCounterChange {
public interface LiveCounter extends LiveCounterChange, ObjectLifecycleChange {

/**
* Increments the value of the counter by the specified amount.
Expand Down
3 changes: 2 additions & 1 deletion lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.ably.lib.objects.type.map;

import io.ably.lib.objects.ObjectsCallback;
import io.ably.lib.objects.type.ObjectLifecycleChange;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.Contract;
Expand All @@ -14,7 +15,7 @@
* The LiveMap interface provides methods to interact with a live, real-time map structure.
* It supports both synchronous and asynchronous operations for managing key-value pairs.
*/
public interface LiveMap extends LiveMapChange {
public interface LiveMap extends LiveMapChange, ObjectLifecycleChange {

/**
* Retrieves the value associated with the specified key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import java.util.concurrent.CancellationException

/**
* Default implementation of RealtimeObjects interface.
* Provides the core functionality for managing live objects on a channel.
* Provides the core functionality for managing objects on a channel.
*/
internal class DefaultRealtimeObjects(internal val channelName: String, internal val adapter: ObjectsAdapter): RealtimeObjects {
private val tag = "DefaultRealtimeObjects"
/**
* @spec RTO3 - Objects pool storing all live objects by object ID
* @spec RTO3 - Objects pool storing all objects by object ID
*/
internal val objectsPool = ObjectsPool(this)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ internal object ObjectsPoolDefaults {
internal const val ROOT_OBJECT_ID = "root"

/**
* ObjectsPool manages a pool of live objects for a channel.
* ObjectsPool manages a pool of objects for a channel.
*
* @spec RTO3 - Maintains an objects pool for all live objects on the channel
* @spec RTO3 - Maintains an objects pool for all objects on the channel
*/
internal class ObjectsPool(
private val realtimeObjects: DefaultRealtimeObjects
Expand All @@ -39,7 +39,7 @@ internal class ObjectsPool(

/**
* ConcurrentHashMap for thread-safe access from public APIs in LiveMap and LiveCounter.
* @spec RTO3a - Pool storing all live objects by object ID
* @spec RTO3a - Pool storing all ably objects by object ID
*/
private val pool = ConcurrentHashMap<String, BaseRealtimeObject>()

Expand All @@ -57,7 +57,7 @@ internal class ObjectsPool(
}

/**
* Gets a live object from the pool by object ID.
* Gets an object from the pool by object ID.
*/
internal fun get(objectId: String): BaseRealtimeObject? {
return pool[objectId]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ private val objectsStateToEventMap = mapOf(
)

/**
* An interface for managing and communicating changes in the synchronization state of live objects.
* An interface for managing and communicating changes in the synchronization state of objects.
*
* Implementations should ensure thread-safe event emission and proper synchronization
* between state change notifications.
*/
internal interface HandlesObjectsStateChange {
/**
* Handles changes in the state of live objects by notifying all registered listeners.
* Handles changes in the state of objects by notifying all registered listeners.
* Implementations should ensure thread-safe event emission to both internal and public listeners.
* Makes sure every event is processed in the order they were received.
* @param newState The new state of the objects, SYNCING or SYNCED.
Expand Down Expand Up @@ -99,7 +99,8 @@ private class ObjectsStateEmitter : EventEmitter<ObjectsStateEvent, ObjectsState
private val tag = "ObjectsStateEmitter"
override fun apply(listener: ObjectsStateChange.Listener?, event: ObjectsStateEvent?, vararg args: Any?) {
try {
listener?.onStateChanged(event!!)
event?.let { listener?.onStateChanged(it) }
?: Log.w(tag, "Null event passed to ObjectsStateChange Listener callback")
} catch (t: Throwable) {
Log.e(tag, "Error occurred while executing listener callback for event: $event", t)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal val ObjectUpdate.noOp get() = this.update == null
internal abstract class BaseRealtimeObject(
internal val objectId: String, // // RTLO3a
internal val objectType: ObjectType,
) {
) : ObjectLifecycleCoordinator() {

protected open val tag = "BaseRealtimeObject"

Expand Down Expand Up @@ -92,7 +92,7 @@ internal abstract class BaseRealtimeObject(

if (isTombstoned) {
// this object is tombstoned so the operation cannot be applied
return;
return
}
applyObjectOperation(objectOperation, objectMessage) // RTLC7d
}
Expand All @@ -115,7 +115,7 @@ internal abstract class BaseRealtimeObject(

internal fun validateObjectId(objectId: String?) {
if (this.objectId != objectId) {
throw objectError("Invalid object: incoming objectId=${objectId}; $objectType objectId=$objectId")
throw objectError("Invalid object: incoming objectId=$objectId; $objectType objectId=${this.objectId}")
}
}

Expand All @@ -129,7 +129,8 @@ internal abstract class BaseRealtimeObject(
isTombstoned = true
tombstonedAt = serialTimestamp?: System.currentTimeMillis()
val update = clearData()
// TODO: Emit BaseRealtimeObject lifecycle events
// Emit object lifecycle event for deletion
objectLifecycleChanged(ObjectLifecycle.Deleted)
return update
}

Expand All @@ -142,13 +143,13 @@ internal abstract class BaseRealtimeObject(
}

/**
* Validates that the provided object state is compatible with this live object.
* Validates that the provided object state is compatible with this object.
* Checks object ID, type-specific validations, and any included create operations.
*/
abstract fun validate(state: ObjectState)

/**
* Applies an object state received during synchronization to this live object.
* Applies an object state received during synchronization to this object.
* This method should update the internal data structure with the complete state
* received from the server.
*
Expand All @@ -159,7 +160,7 @@ internal abstract class BaseRealtimeObject(
abstract fun applyObjectState(objectState: ObjectState, message: ObjectMessage): ObjectUpdate

/**
* Applies an operation to this live object.
* Applies an operation to this object.
* This method handles the specific operation actions (e.g., update, remove)
* by modifying the underlying data structure accordingly.
*
Expand All @@ -185,7 +186,7 @@ internal abstract class BaseRealtimeObject(
abstract fun clearData(): ObjectUpdate

/**
* Notifies subscribers about changes made to this live object. Propagates updates through the
* Notifies subscribers about changes made to this object. Propagates updates through the
* appropriate manager after converting the generic update map to type-specific update objects.
* Spec: RTLO4b4c
*/
Expand Down
Loading
Loading