Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.ably.lib.http.HttpUtils;
import io.ably.lib.objects.RealtimeObjects;
import io.ably.lib.objects.LiveObjectsPlugin;
import io.ably.lib.rest.MessageEditsMixin;
import io.ably.lib.rest.RestAnnotations;
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.transport.ConnectionManager.QueuedMessage;
Expand All @@ -32,6 +33,7 @@
import io.ably.lib.types.Message;
import io.ably.lib.types.MessageAnnotations;
import io.ably.lib.types.MessageDecodeException;
import io.ably.lib.types.MessageOperation;
import io.ably.lib.types.MessageSerializer;
import io.ably.lib.types.MessageVersion;
import io.ably.lib.types.PaginatedResult;
Expand Down Expand Up @@ -100,6 +102,8 @@ public abstract class ChannelBase extends EventEmitter<ChannelEvent, ChannelStat

@Nullable private final LiveObjectsPlugin liveObjectsPlugin;

private volatile MessageEditsMixin messageEditsMixin;

public RealtimeObjects getObjects() throws AblyException {
if (liveObjectsPlugin == null) {
throw AblyException.fromErrorInfo(
Expand Down Expand Up @@ -1172,6 +1176,172 @@ else if(!"false".equalsIgnoreCase(param.value)) {
private static final String KEY_UNTIL_ATTACH = "untilAttach";
private static final String KEY_FROM_SERIAL = "fromSerial";

//region Message Edits and Deletes

/**
* Retrieves the latest version of a specific message by its serial identifier.
* <p>
* This method allows you to fetch the current state of a message, including any updates
* or deletions that have been applied since its creation.
*
* @param serial The unique serial identifier of the message to retrieve.
* @return A {@link Message} object representing the latest version of the message.
* @throws AblyException If the message cannot be retrieved or does not exist.
*/
public Message getMessage(String serial) throws AblyException {
return messageEditsMixin.getMessage(ably.http, serial);
}

/**
* Asynchronously retrieves the latest version of a specific message by its serial identifier.
*
* @param serial The unique serial identifier of the message to retrieve.
* @param callback A callback to handle the result asynchronously.
* <p>
* This callback is invoked on a background thread.
*/
public void getMessageAsync(String serial, Callback<Message> callback) {
messageEditsMixin.getMessageAsync(ably.http, serial, callback);
}

/**
* Updates an existing message using patch semantics.
* <p>
* Non-null fields in the provided message (name, data, extras) will replace the corresponding
* fields in the existing message, while null fields will be left unchanged.
*
* @param message A {@link Message} object containing the fields to update and the serial identifier.
* Only non-null fields will be applied to the existing message.
* @param operation operation metadata such as clientId, description, or metadata in the version field
* @throws AblyException If the update operation fails.
*/
public void updateMessage(Message message, MessageOperation operation) throws AblyException {
messageEditsMixin.updateMessage(ably.http, message, operation);
}

/**
* Updates an existing message using patch semantics.
* <p>
* Non-null fields in the provided message (name, data, extras) will replace the corresponding
* fields in the existing message, while null fields will be left unchanged.
*
* @param message A {@link Message} object containing the fields to update and the serial identifier.
* Only non-null fields will be applied to the existing message.
* @throws AblyException If the update operation fails.
*/
public void updateMessage(Message message) throws AblyException {
updateMessage(message, null);
}

/**
* Asynchronously updates an existing message.
*
* @param message A {@link Message} object containing the fields to update and the serial identifier.
* @param operation operation metadata such as clientId, description, or metadata in the version field
* @param listener A listener to be notified of the outcome of this operation.
* <p>
* This listener is invoked on a background thread.
*/
public void updateMessageAsync(Message message, MessageOperation operation, CompletionListener listener) {
messageEditsMixin.updateMessageAsync(ably.http, message, operation, listener);
}

/**
* Asynchronously updates an existing message.
*
* @param message A {@link Message} object containing the fields to update and the serial identifier.
* @param listener A listener to be notified of the outcome of this operation.
* <p>
* This listener is invoked on a background thread.
*/
public void updateMessageAsync(Message message, CompletionListener listener) {
updateMessageAsync(message, null, listener);
}

/**
* Marks a message as deleted.
* <p>
* This operation does not remove the message from history; it marks it as deleted
* while preserving the full message history. The deleted message can still be
* retrieved and will have its action set to MESSAGE_DELETE.
*
* @param message A {@link Message} message containing the serial identifier.
* @param operation operation metadata such as clientId, description, or metadata in the version field
* @throws AblyException If the delete operation fails.
*/
public void deleteMessage(Message message, MessageOperation operation) throws AblyException {
messageEditsMixin.deleteMessage(ably.http, message, operation);
}

/**
* Marks a message as deleted.
* <p>
* This operation does not remove the message from history; it marks it as deleted
* while preserving the full message history. The deleted message can still be
* retrieved and will have its action set to MESSAGE_DELETE.
*
* @param message A {@link Message} message containing the serial identifier.
* @throws AblyException If the delete operation fails.
*/
public void deleteMessage(Message message) throws AblyException {
deleteMessage(message, null);
}

/**
* Asynchronously marks a message as deleted.
*
* @param message A {@link Message} object containing the serial identifier and operation metadata.
* @param operation operation metadata such as clientId, description, or metadata in the version field
* @param listener A listener to be notified of the outcome of this operation.
* <p>
* This listener is invoked on a background thread.
*/
public void deleteMessageAsync(Message message, MessageOperation operation, CompletionListener listener) {
messageEditsMixin.deleteMessageAsync(ably.http, message, operation, listener);
}

/**
* Asynchronously marks a message as deleted.
*
* @param message A {@link Message} object containing the serial identifier and operation metadata.
* @param listener A listener to be notified of the outcome of this operation.
* <p>
* This listener is invoked on a background thread.
*/
public void deleteMessageAsync(Message message, CompletionListener listener) {
deleteMessageAsync(message, null, listener);
}

/**
* Retrieves all historical versions of a specific message.
* <p>
* This method returns a paginated result containing all versions of the message,
* ordered chronologically. Each version includes metadata about when and by whom
* the message was modified.
*
* @param serial The unique serial identifier of the message.
* @param params Query parameters for filtering or pagination (e.g., limit, start, end).
* @return A {@link PaginatedResult} containing an array of {@link Message} objects
* representing all versions of the message.
* @throws AblyException If the versions cannot be retrieved.
*/
public PaginatedResult<Message> getMessageVersions(String serial, Param[] params) throws AblyException {
return messageEditsMixin.getMessageVersions(ably.http, serial, params);
}

/**
* Asynchronously retrieves all historical versions of a specific message.
*
* @param serial The unique serial identifier of the message.
* @param params Query parameters for filtering or pagination.
* @param callback A callback to handle the result asynchronously.
*/
public void getMessageVersionsAsync(String serial, Param[] params, Callback<AsyncPaginatedResult<Message>> callback) throws AblyException {
messageEditsMixin.getMessageVersionsAsync(ably.http, serial, params, callback);
}

//endregion

/************************************
* Channel history
************************************/
Expand Down Expand Up @@ -1278,6 +1448,7 @@ public void setOptions(ChannelOptions options) throws AblyException {
*/
public void setOptions(ChannelOptions options, CompletionListener listener) throws AblyException {
this.options = options;
this.messageEditsMixin = new MessageEditsMixin(basePath, ably.options, options, ably.auth);
if(this.shouldReattachToSetOptions(options)) {
this.attach(true, listener);
} else {
Expand Down Expand Up @@ -1353,6 +1524,7 @@ else if(stateChange.current.equals(failureState)) {
this,
new RestAnnotations(name, ably.http, ably.options, options)
);
this.messageEditsMixin = new MessageEditsMixin(basePath, ably.options, options, ably.auth);
}

void onChannelMessage(ProtocolMessage msg) {
Expand Down
Loading
Loading