From c6cf81dd7072d3db28322f803d9a0b2d2cb5be70 Mon Sep 17 00:00:00 2001 From: evgeny Date: Wed, 19 Nov 2025 13:23:12 +0000 Subject: [PATCH] [ECO-5642] feat: add message editing support to channels - Introduced APIs to retrieve, update, and delete messages by serial identifier. - Added support for fetching historical versions of messages. - Implemented `MessageEditsMixin` to handle message editing operations. - Added tests for message editing and retrieval functionality. --- .../io/ably/lib/realtime/ChannelBase.java | 172 +++++++ .../java/io/ably/lib/rest/ChannelBase.java | 176 ++++++- .../io/ably/lib/rest/MessageEditsMixin.java | 222 ++++++++ .../java/io/ably/lib/types/BaseMessage.java | 36 +- .../io/ably/lib/types/MessageOperation.java | 100 ++++ .../lib/types/MessageOperationSerializer.java | 172 +++++++ .../io/ably/lib/types/MessageSerializer.java | 39 ++ .../test/rest/RestChannelMessageEditTest.java | 483 ++++++++++++++++++ 8 files changed, 1390 insertions(+), 10 deletions(-) create mode 100644 lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java create mode 100644 lib/src/main/java/io/ably/lib/types/MessageOperation.java create mode 100644 lib/src/main/java/io/ably/lib/types/MessageOperationSerializer.java create mode 100644 lib/src/test/java/io/ably/lib/test/rest/RestChannelMessageEditTest.java diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index d8332dedf..b43bf363c 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -16,6 +16,7 @@ import io.ably.lib.http.HttpUtils; import io.ably.lib.objects.RealtimeObjects; import io.ably.lib.objects.LiveObjectsPlugin; +import io.ably.lib.rest.MessageEditsMixin; import io.ably.lib.rest.RestAnnotations; import io.ably.lib.transport.ConnectionManager; import io.ably.lib.transport.ConnectionManager.QueuedMessage; @@ -32,6 +33,7 @@ import io.ably.lib.types.Message; import io.ably.lib.types.MessageAnnotations; import io.ably.lib.types.MessageDecodeException; +import io.ably.lib.types.MessageOperation; import io.ably.lib.types.MessageSerializer; import io.ably.lib.types.MessageVersion; import io.ably.lib.types.PaginatedResult; @@ -100,6 +102,8 @@ public abstract class ChannelBase extends EventEmitter + * This method allows you to fetch the current state of a message, including any updates + * or deletions that have been applied since its creation. + * + * @param serial The unique serial identifier of the message to retrieve. + * @return A {@link Message} object representing the latest version of the message. + * @throws AblyException If the message cannot be retrieved or does not exist. + */ + public Message getMessage(String serial) throws AblyException { + return messageEditsMixin.getMessage(ably.http, serial); + } + + /** + * Asynchronously retrieves the latest version of a specific message by its serial identifier. + * + * @param serial The unique serial identifier of the message to retrieve. + * @param callback A callback to handle the result asynchronously. + *

+ * This callback is invoked on a background thread. + */ + public void getMessageAsync(String serial, Callback callback) { + messageEditsMixin.getMessageAsync(ably.http, serial, callback); + } + + /** + * Updates an existing message using patch semantics. + *

+ * Non-null fields in the provided message (name, data, extras) will replace the corresponding + * fields in the existing message, while null fields will be left unchanged. + * + * @param message A {@link Message} object containing the fields to update and the serial identifier. + * Only non-null fields will be applied to the existing message. + * @param operation operation metadata such as clientId, description, or metadata in the version field + * @throws AblyException If the update operation fails. + */ + public void updateMessage(Message message, MessageOperation operation) throws AblyException { + messageEditsMixin.updateMessage(ably.http, message, operation); + } + + /** + * Updates an existing message using patch semantics. + *

+ * Non-null fields in the provided message (name, data, extras) will replace the corresponding + * fields in the existing message, while null fields will be left unchanged. + * + * @param message A {@link Message} object containing the fields to update and the serial identifier. + * Only non-null fields will be applied to the existing message. + * @throws AblyException If the update operation fails. + */ + public void updateMessage(Message message) throws AblyException { + updateMessage(message, null); + } + + /** + * Asynchronously updates an existing message. + * + * @param message A {@link Message} object containing the fields to update and the serial identifier. + * @param operation operation metadata such as clientId, description, or metadata in the version field + * @param listener A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void updateMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { + messageEditsMixin.updateMessageAsync(ably.http, message, operation, listener); + } + + /** + * Asynchronously updates an existing message. + * + * @param message A {@link Message} object containing the fields to update and the serial identifier. + * @param listener A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void updateMessageAsync(Message message, CompletionListener listener) { + updateMessageAsync(message, null, listener); + } + + /** + * Marks a message as deleted. + *

+ * This operation does not remove the message from history; it marks it as deleted + * while preserving the full message history. The deleted message can still be + * retrieved and will have its action set to MESSAGE_DELETE. + * + * @param message A {@link Message} message containing the serial identifier. + * @param operation operation metadata such as clientId, description, or metadata in the version field + * @throws AblyException If the delete operation fails. + */ + public void deleteMessage(Message message, MessageOperation operation) throws AblyException { + messageEditsMixin.deleteMessage(ably.http, message, operation); + } + + /** + * Marks a message as deleted. + *

+ * This operation does not remove the message from history; it marks it as deleted + * while preserving the full message history. The deleted message can still be + * retrieved and will have its action set to MESSAGE_DELETE. + * + * @param message A {@link Message} message containing the serial identifier. + * @throws AblyException If the delete operation fails. + */ + public void deleteMessage(Message message) throws AblyException { + deleteMessage(message, null); + } + + /** + * Asynchronously marks a message as deleted. + * + * @param message A {@link Message} object containing the serial identifier and operation metadata. + * @param operation operation metadata such as clientId, description, or metadata in the version field + * @param listener A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void deleteMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { + messageEditsMixin.deleteMessageAsync(ably.http, message, operation, listener); + } + + /** + * Asynchronously marks a message as deleted. + * + * @param message A {@link Message} object containing the serial identifier and operation metadata. + * @param listener A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void deleteMessageAsync(Message message, CompletionListener listener) { + deleteMessageAsync(message, null, listener); + } + + /** + * Retrieves all historical versions of a specific message. + *

+ * This method returns a paginated result containing all versions of the message, + * ordered chronologically. Each version includes metadata about when and by whom + * the message was modified. + * + * @param serial The unique serial identifier of the message. + * @param params Query parameters for filtering or pagination (e.g., limit, start, end). + * @return A {@link PaginatedResult} containing an array of {@link Message} objects + * representing all versions of the message. + * @throws AblyException If the versions cannot be retrieved. + */ + public PaginatedResult getMessageVersions(String serial, Param[] params) throws AblyException { + return messageEditsMixin.getMessageVersions(ably.http, serial, params); + } + + /** + * Asynchronously retrieves all historical versions of a specific message. + * + * @param serial The unique serial identifier of the message. + * @param params Query parameters for filtering or pagination. + * @param callback A callback to handle the result asynchronously. + */ + public void getMessageVersionsAsync(String serial, Param[] params, Callback> callback) throws AblyException { + messageEditsMixin.getMessageVersionsAsync(ably.http, serial, params, callback); + } + + //endregion + /************************************ * Channel history ************************************/ @@ -1278,6 +1448,7 @@ public void setOptions(ChannelOptions options) throws AblyException { */ public void setOptions(ChannelOptions options, CompletionListener listener) throws AblyException { this.options = options; + this.messageEditsMixin = new MessageEditsMixin(basePath, ably.options, options, ably.auth); if(this.shouldReattachToSetOptions(options)) { this.attach(true, listener); } else { @@ -1353,6 +1524,7 @@ else if(stateChange.current.equals(failureState)) { this, new RestAnnotations(name, ably.http, ably.options, options) ); + this.messageEditsMixin = new MessageEditsMixin(basePath, ably.options, options, ably.auth); } void onChannelMessage(ProtocolMessage msg) { diff --git a/lib/src/main/java/io/ably/lib/rest/ChannelBase.java b/lib/src/main/java/io/ably/lib/rest/ChannelBase.java index 11958e7b4..47ec7bb4c 100644 --- a/lib/src/main/java/io/ably/lib/rest/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/rest/ChannelBase.java @@ -11,6 +11,7 @@ import io.ably.lib.types.Callback; import io.ably.lib.types.ChannelOptions; import io.ably.lib.types.Message; +import io.ably.lib.types.MessageOperation; import io.ably.lib.types.MessageSerializer; import io.ably.lib.types.PaginatedResult; import io.ably.lib.types.Param; @@ -45,13 +46,15 @@ public class ChannelBase { public final RestAnnotations annotations; + private final MessageEditsMixin messageEditsMixin; + + /** * Publish a message on this channel using the REST API. * Since the REST API is stateless, this request is made independently * of any other request on this or any other channel. * @param name the event name - * @param data the message payload; see {@link io.ably.types.Data} for - * details of supported data types. + * @param data the message payload; * @throws AblyException */ public void publish(String name, Object data) throws AblyException { @@ -68,7 +71,7 @@ void publish(Http http, String name, Object data) throws AblyException { * of any other request on this or any other channel. * * @param name the event name - * @param data the message payload; see {@link io.ably.types.Data} for + * @param data the message payload; * @param listener a listener to be notified of the outcome of this message. *

* This listener is invoked on a background thread. @@ -311,6 +314,172 @@ private BasePaginatedQuery.ResultRequest historyImpl(Http http, } + //region Message Edits and Deletes + + /** + * Retrieves the latest version of a specific message by its serial identifier. + *

+ * This method allows you to fetch the current state of a message, including any updates + * or deletions that have been applied since its creation. + * + * @param serial The unique serial identifier of the message to retrieve. + * @return A {@link Message} object representing the latest version of the message. + * @throws AblyException If the message cannot be retrieved or does not exist. + */ + public Message getMessage(String serial) throws AblyException { + return messageEditsMixin.getMessage(ably.http, serial); + } + + /** + * Asynchronously retrieves the latest version of a specific message by its serial identifier. + * + * @param serial The unique serial identifier of the message to retrieve. + * @param callback A callback to handle the result asynchronously. + *

+ * This callback is invoked on a background thread. + */ + public void getMessageAsync(String serial, Callback callback) { + messageEditsMixin.getMessageAsync(ably.http, serial, callback); + } + + /** + * Updates an existing message using patch semantics. + *

+ * Non-null fields in the provided message (name, data, extras) will replace the corresponding + * fields in the existing message, while null fields will be left unchanged. + * + * @param message A {@link Message} object containing the fields to update and the serial identifier. + * Only non-null fields will be applied to the existing message. + * @param operation operation metadata such as clientId, description, or metadata in the version field + * @throws AblyException If the update operation fails. + */ + public void updateMessage(Message message, MessageOperation operation) throws AblyException { + messageEditsMixin.updateMessage(ably.http, message, operation); + } + + /** + * Updates an existing message using patch semantics. + *

+ * Non-null fields in the provided message (name, data, extras) will replace the corresponding + * fields in the existing message, while null fields will be left unchanged. + * + * @param message A {@link Message} object containing the fields to update and the serial identifier. + * Only non-null fields will be applied to the existing message. + * @throws AblyException If the update operation fails. + */ + public void updateMessage(Message message) throws AblyException { + updateMessage(message, null); + } + + /** + * Asynchronously updates an existing message. + * + * @param message A {@link Message} object containing the fields to update and the serial identifier. + * @param operation operation metadata such as clientId, description, or metadata in the version field + * @param listener A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void updateMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { + messageEditsMixin.updateMessageAsync(ably.http, message, operation, listener); + } + + /** + * Asynchronously updates an existing message. + * + * @param message A {@link Message} object containing the fields to update and the serial identifier. + * @param listener A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void updateMessageAsync(Message message, CompletionListener listener) { + updateMessageAsync(message, null, listener); + } + + /** + * Marks a message as deleted. + *

+ * This operation does not remove the message from history; it marks it as deleted + * while preserving the full message history. The deleted message can still be + * retrieved and will have its action set to MESSAGE_DELETE. + * + * @param message A {@link Message} message containing the serial identifier. + * @param operation operation metadata such as clientId, description, or metadata in the version field + * @throws AblyException If the delete operation fails. + */ + public void deleteMessage(Message message, MessageOperation operation) throws AblyException { + messageEditsMixin.deleteMessage(ably.http, message, operation); + } + + /** + * Marks a message as deleted. + *

+ * This operation does not remove the message from history; it marks it as deleted + * while preserving the full message history. The deleted message can still be + * retrieved and will have its action set to MESSAGE_DELETE. + * + * @param message A {@link Message} message containing the serial identifier. + * @throws AblyException If the delete operation fails. + */ + public void deleteMessage(Message message) throws AblyException { + deleteMessage(message, null); + } + + /** + * Asynchronously marks a message as deleted. + * + * @param message A {@link Message} object containing the serial identifier and operation metadata. + * @param operation operation metadata such as clientId, description, or metadata in the version field + * @param listener A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void deleteMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { + messageEditsMixin.deleteMessageAsync(ably.http, message, operation, listener); + } + + /** + * Asynchronously marks a message as deleted. + * + * @param message A {@link Message} object containing the serial identifier and operation metadata. + * @param listener A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void deleteMessageAsync(Message message, CompletionListener listener) { + deleteMessageAsync(message, null, listener); + } + + /** + * Retrieves all historical versions of a specific message. + *

+ * This method returns a paginated result containing all versions of the message, + * ordered chronologically. Each version includes metadata about when and by whom + * the message was modified. + * + * @param serial The unique serial identifier of the message. + * @param params Query parameters for filtering or pagination (e.g., limit, start, end). + * @return A {@link PaginatedResult} containing an array of {@link Message} objects + * representing all versions of the message. + * @throws AblyException If the versions cannot be retrieved. + */ + public PaginatedResult getMessageVersions(String serial, Param[] params) throws AblyException { + return messageEditsMixin.getMessageVersions(ably.http, serial, params); + } + + /** + * Asynchronously retrieves all historical versions of a specific message. + * + * @param serial The unique serial identifier of the message. + * @param params Query parameters for filtering or pagination. + * @param callback A callback to handle the result asynchronously. + */ + public void getMessageVersionsAsync(String serial, Param[] params, Callback> callback) throws AblyException { + messageEditsMixin.getMessageVersionsAsync(ably.http, serial, params, callback); + } + + //endregion + /****************** * internal * @throws AblyException @@ -323,6 +492,7 @@ private BasePaginatedQuery.ResultRequest historyImpl(Http http, this.basePath = "/channels/" + HttpUtils.encodeURIComponent(name); this.presence = new Presence(); this.annotations = new RestAnnotations(name, ably.http, ably.options, options); + this.messageEditsMixin = new MessageEditsMixin(basePath, ably.options, options, ably.auth); } private final AblyBase ably; diff --git a/lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java b/lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java new file mode 100644 index 000000000..d08400f10 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java @@ -0,0 +1,222 @@ +package io.ably.lib.rest; + +import io.ably.lib.http.BasePaginatedQuery; +import io.ably.lib.http.Http; +import io.ably.lib.http.HttpCore; +import io.ably.lib.http.HttpUtils; +import io.ably.lib.realtime.CompletionListener; +import io.ably.lib.types.AblyException; +import io.ably.lib.types.AsyncPaginatedResult; +import io.ably.lib.types.Callback; +import io.ably.lib.types.ChannelOptions; +import io.ably.lib.types.ClientOptions; +import io.ably.lib.types.ErrorInfo; +import io.ably.lib.types.Message; +import io.ably.lib.types.MessageOperation; +import io.ably.lib.types.MessageOperationSerializer; +import io.ably.lib.types.MessageSerializer; +import io.ably.lib.types.PaginatedResult; +import io.ably.lib.types.Param; +import io.ably.lib.util.Crypto; + +public class MessageEditsMixin { + + private final String basePath; + + private final ClientOptions clientOptions; + + private final ChannelOptions channelOptions; + + private final Auth auth; + + public MessageEditsMixin(String basePath, ClientOptions clientOptions, ChannelOptions channelOptions, Auth auth) { + this.basePath = basePath; + this.clientOptions = clientOptions; + this.channelOptions = channelOptions; + this.auth = auth; + } + + /** + * Retrieves the latest version of a specific message by its serial identifier. + *

+ * This method allows you to fetch the current state of a message, including any updates + * or deletions that have been applied since its creation. + * + * @param serial The unique serial identifier of the message to retrieve. + * @return A {@link Message} object representing the latest version of the message. + * @throws AblyException If the message cannot be retrieved or does not exist. + */ + public Message getMessage(Http http, String serial) throws AblyException { + return getMessageImpl(http, serial).sync(); + } + + /** + * Asynchronously retrieves the latest version of a specific message by its serial identifier. + * + * @param serial The unique serial identifier of the message to retrieve. + * @param callback A callback to handle the result asynchronously. + *

+ * This callback is invoked on a background thread. + */ + public void getMessageAsync(Http http, String serial, Callback callback) { + getMessageImpl(http, serial).async(callback); + } + + private Http.Request getMessageImpl(Http http, String serial) { + return http.request((scheduler, callback) -> { + if (serial == null || serial.isEmpty()) { + throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003)); + } + HttpCore.BodyHandler bodyHandler = MessageSerializer.getSingleMessageResponseHandler(channelOptions); + final Param[] params = clientOptions.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; + scheduler.get(basePath + "/messages/" + HttpUtils.encodeURIComponent(serial), + HttpUtils.defaultAcceptHeaders(clientOptions.useBinaryProtocol), + params, + (response, error) -> { + if (error != null) throw AblyException.fromErrorInfo(error); + Message[] messages = bodyHandler.handleResponseBody(response.contentType, response.body); + if (messages != null && messages.length > 0) return messages[0]; + throw AblyException.fromErrorInfo(new ErrorInfo("Message not found", 404, 40400)); + }, + true, + callback); + }); + } + + /** + * Updates an existing message using patch semantics. + *

+ * Non-null fields in the provided message (name, data, extras) will replace the corresponding + * fields in the existing message, while null fields will be left unchanged. + * + * @param message A {@link Message} object containing the fields to update and the serial identifier. + * Only non-null fields will be applied to the existing message. + * @throws AblyException If the update operation fails. + */ + public void updateMessage(Http http, Message message, MessageOperation operation) throws AblyException { + updateMessageImpl(http, message, operation).sync(); + } + + /** + * Asynchronously updates an existing message. + * + * @param message A {@link Message} object containing the fields to update and the serial identifier. + * @param listener A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void updateMessageAsync(Http http, Message message, MessageOperation operation, CompletionListener listener) { + updateMessageImpl(http, message, operation).async(new CompletionListener.ToCallback(listener)); + } + + private Http.Request updateMessageImpl(Http http, Message message, MessageOperation operation) { + return http.request((scheduler, callback) -> { + if (message.serial == null || message.serial.isEmpty()) { + throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003)); + } + /* RTL6g3 */ + auth.checkClientId(message, true, false); + + HttpCore.RequestBody requestBody = clientOptions.useBinaryProtocol + ? MessageOperationSerializer.asMsgPackRequest(message, operation, channelOptions) + : MessageOperationSerializer.asJsonRequest(message, operation, channelOptions); + final Param[] params = clientOptions.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; + + scheduler.patch(basePath + "/messages/" + HttpUtils.encodeURIComponent(message.serial), + HttpUtils.defaultAcceptHeaders(clientOptions.useBinaryProtocol), + params, + requestBody, + null, + true, + callback); + }); + } + + /** + * Marks a message as deleted. + *

+ * This operation does not remove the message from history; it marks it as deleted + * while preserving the full message history. The deleted message can still be + * retrieved and will have its action set to MESSAGE_DELETE. + * + * @param message A {@link Message} message containing the serial identifier. + * @throws AblyException If the delete operation fails. + */ + public void deleteMessage(Http http, Message message, MessageOperation operation) throws AblyException { + deleteMessageImpl(http, message, operation).sync(); + } + + /** + * Asynchronously marks a message as deleted. + * + * @param message A {@link Message} object containing the serial identifier and operation metadata. + * @param listener A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void deleteMessageAsync(Http http, Message message, MessageOperation operation, CompletionListener listener) { + deleteMessageImpl(http, message, operation).async(new CompletionListener.ToCallback(listener)); + } + + private Http.Request deleteMessageImpl(Http http, Message message, MessageOperation operation) { + return http.request((scheduler, callback) -> { + if (message.serial == null || message.serial.isEmpty()) { + throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003)); + } + /* RTL6g3 */ + auth.checkClientId(message, true, false); + + HttpCore.RequestBody requestBody = clientOptions.useBinaryProtocol + ? MessageOperationSerializer.asMsgPackRequest(message, operation, channelOptions) + : MessageOperationSerializer.asJsonRequest(message, operation, channelOptions); + final Param[] params = clientOptions.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; + + scheduler.post(basePath + "/messages/" + HttpUtils.encodeURIComponent(message.serial) + "/delete", + HttpUtils.defaultAcceptHeaders(clientOptions.useBinaryProtocol), + params, + requestBody, + null, + true, + callback); + }); + } + + /** + * Retrieves all historical versions of a specific message. + *

+ * This method returns a paginated result containing all versions of the message, + * ordered chronologically. Each version includes metadata about when and by whom + * the message was modified. + * + * @param serial The unique serial identifier of the message. + * @param params Query parameters for filtering or pagination (e.g., limit, start, end). + * @return A {@link PaginatedResult} containing an array of {@link Message} objects + * representing all versions of the message. + * @throws AblyException If the versions cannot be retrieved. + */ + public PaginatedResult getMessageVersions(Http http, String serial, Param[] params) throws AblyException { + return getMessageVersionsImpl(http, serial, params).sync(); + } + + /** + * Asynchronously retrieves all historical versions of a specific message. + * + * @param serial The unique serial identifier of the message. + * @param params Query parameters for filtering or pagination. + * @param callback A callback to handle the result asynchronously. + */ + public void getMessageVersionsAsync(Http http, String serial, Param[] params, Callback> callback) throws AblyException { + getMessageVersionsImpl(http, serial, params).async(callback); + } + + private BasePaginatedQuery.ResultRequest getMessageVersionsImpl(Http http, String serial, Param[] initialParams) throws AblyException { + if (serial == null || serial.isEmpty()) { + throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003)); + } + HttpCore.BodyHandler bodyHandler = MessageSerializer.getMessageResponseHandler(channelOptions); + final Param[] params = clientOptions.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams; + return (new BasePaginatedQuery<>(http, basePath + "/messages/" + HttpUtils.encodeURIComponent(serial) + "/versions", + HttpUtils.defaultAcceptHeaders(clientOptions.useBinaryProtocol), params, bodyHandler)).get(); + } + +} diff --git a/lib/src/main/java/io/ably/lib/types/BaseMessage.java b/lib/src/main/java/io/ably/lib/types/BaseMessage.java index 10c6ba42b..52558b642 100644 --- a/lib/src/main/java/io/ably/lib/types/BaseMessage.java +++ b/lib/src/main/java/io/ably/lib/types/BaseMessage.java @@ -180,26 +180,38 @@ else if (lastPayload != null) } public void encode(ChannelOptions opts) throws AblyException { - if(data != null) { - if(data instanceof JsonElement) { - data = Serialisation.gson.toJson((JsonElement)data); + EncodedMessageData encodedData = encodeData(opts); + this.data = encodedData.data; + this.encoding = encodedData.encoding; + } + + EncodedMessageData encodeData(ChannelOptions opts) throws AblyException { + Object decodedData = this.data; + String encoding = this.encoding; + + if (decodedData != null) { + if (decodedData instanceof JsonElement) { + decodedData = Serialisation.gson.toJson((JsonElement) decodedData); encoding = ((encoding == null) ? "" : encoding + "/") + "json"; } - if(data instanceof String) { + if (decodedData instanceof String) { if (opts != null && opts.encrypted) { - try { data = ((String)data).getBytes("UTF-8"); } catch(UnsupportedEncodingException e) {} + try { decodedData = ((String)decodedData).getBytes("UTF-8"); } catch(UnsupportedEncodingException e) {} encoding = ((encoding == null) ? "" : encoding + "/") + "utf-8"; } - } else if(!(data instanceof byte[])) { + } else if (!(decodedData instanceof byte[])) { Log.d(TAG, "Message data must be either `byte[]`, `String` or `JSONElement`; implicit coercion of other types to String is deprecated"); throw AblyException.fromErrorInfo(new ErrorInfo("Invalid message data or encoding", 400, 40013)); } } + if (opts != null && opts.encrypted) { EncryptingChannelCipher cipher = Crypto.createChannelEncipher(opts.getCipherParamsOrDefault()); - data = cipher.encrypt((byte[]) data); + decodedData = cipher.encrypt((byte[]) decodedData); encoding = ((encoding == null) ? "" : encoding + "/") + "cipher+" + cipher.getAlgorithm(); } + + return new EncodedMessageData(decodedData, encoding); } /* trivial utilities for processing encoding string */ @@ -367,4 +379,14 @@ void writeFields(MessagePacker packer) throws IOException { } private static final String TAG = BaseMessage.class.getName(); + + static class EncodedMessageData { + public final Object data; + public final String encoding; + + EncodedMessageData(Object data, String encoding) { + this.data = data; + this.encoding = encoding; + } + } } diff --git a/lib/src/main/java/io/ably/lib/types/MessageOperation.java b/lib/src/main/java/io/ably/lib/types/MessageOperation.java new file mode 100644 index 000000000..ecb16f3e6 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/types/MessageOperation.java @@ -0,0 +1,100 @@ +package io.ably.lib.types; + +import com.google.gson.JsonObject; +import io.ably.lib.util.Serialisation; +import org.msgpack.core.MessagePacker; + +import java.io.IOException; +import java.util.Map; + +/** + * Represents metadata about a message operation (update/delete). + * Contains optional information about who performed the operation and why. + */ +public class MessageOperation { + + private static final String CLIENT_ID = "clientId"; + private static final String DESCRIPTION = "description"; + private static final String METADATA = "metadata"; + + /** + * Optional identifier of the client performing the operation. + */ + public String clientId; + + /** + * Optional human-readable description of the operation. + */ + public String description; + + /** + * Optional dictionary of key-value pairs containing additional metadata about the operation. + */ + public Map metadata; + + /** + * Default constructor + */ + public MessageOperation() { + } + + /** + * Constructor with all fields + */ + public MessageOperation(String clientId, String description, Map metadata) { + this.clientId = clientId; + this.description = description; + this.metadata = metadata; + } + + /** + * Writes this MessageOperation to MessagePack format. + * + * @param packer The MessagePacker to write to + * @throws IOException If writing fails + */ + void writeMsgpack(MessagePacker packer) throws IOException { + int fieldCount = 0; + if (clientId != null) ++fieldCount; + if (description != null) ++fieldCount; + if (metadata != null) ++fieldCount; + + packer.packMapHeader(fieldCount); + + if (clientId != null) { + packer.packString(CLIENT_ID); + packer.packString(clientId); + } + if (description != null) { + packer.packString(DESCRIPTION); + packer.packString(description); + } + if (metadata != null) { + packer.packString(METADATA); + MessageSerializer.write(metadata, packer); + } + } + + + /** + * Converts this MessageOperation to a JsonObject. + * + * @return JsonObject representation + */ + JsonObject asJsonObject() { + JsonObject json = new JsonObject(); + + if (clientId != null) { + json.addProperty(CLIENT_ID, clientId); + } + if (description != null) { + json.addProperty(DESCRIPTION, description); + } + if (metadata != null) { + json.add(METADATA, Serialisation.gson.toJsonTree(metadata)); + } + + return json; + } + +} diff --git a/lib/src/main/java/io/ably/lib/types/MessageOperationSerializer.java b/lib/src/main/java/io/ably/lib/types/MessageOperationSerializer.java new file mode 100644 index 000000000..0dded32cf --- /dev/null +++ b/lib/src/main/java/io/ably/lib/types/MessageOperationSerializer.java @@ -0,0 +1,172 @@ +package io.ably.lib.types; + +import com.google.gson.JsonObject; +import io.ably.lib.http.HttpCore; +import io.ably.lib.http.HttpUtils; +import io.ably.lib.util.Base64Coder; +import io.ably.lib.util.Log; +import io.ably.lib.util.Serialisation; +import org.msgpack.core.MessagePacker; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +/** + * MessageOperationSerializer: internal + * Utility class to serialize message update/delete requests in different formats. + */ +public class MessageOperationSerializer { + + /** + * Creates a JSON request body for a message update/delete operation. + * + * @param message The message containing the update/delete data + * @param operation The MessageOperation metadata + * @param channelOptions Channel options for encoding + * @return HttpCore.RequestBody for the request + * @throws AblyException If encoding fails + */ + public static HttpCore.RequestBody asJsonRequest(Message message, MessageOperation operation, ChannelOptions channelOptions) throws AblyException { + UpdateDeleteRequest request = new UpdateDeleteRequest(message, operation, channelOptions); + return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(request.asJsonObject())); + } + + /** + * Creates a MessagePack request body for a message update/delete operation. + * + * @param message The message containing the update/delete data + * @param operation The MessageOperation metadata + * @param channelOptions Channel options for encoding + * @return HttpCore.RequestBody for the request + * @throws AblyException If encoding fails + */ + public static HttpCore.RequestBody asMsgPackRequest(Message message, MessageOperation operation, ChannelOptions channelOptions) throws AblyException { + UpdateDeleteRequest request = new UpdateDeleteRequest(message, operation, channelOptions); + byte[] packed = writeMsgpack(request); + return new HttpUtils.ByteArrayRequestBody(packed, "application/x-msgpack"); + } + + /** + * Serializes an UpdateDeleteRequest to MessagePack format. + * + * @param request The request to serialize + * @return byte array containing the MessagePack data + */ + private static byte[] writeMsgpack(UpdateDeleteRequest request) { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + MessagePacker packer = Serialisation.msgpackPackerConfig.newPacker(out); + request.writeMsgpack(packer); + packer.flush(); + return out.toByteArray(); + } catch (IOException e) { + Log.e(TAG, "Failed to write msgpack", e); + return null; + } + } + + /** + * Represents a request to update or delete a message. + * Contains the message data and operation metadata. + */ + static class UpdateDeleteRequest { + private static final String NAME = "name"; + private static final String DATA = "data"; + private static final String ENCODING = "encoding"; + private static final String EXTRAS = "extras"; + private static final String OPERATION = "operation"; + + public final String name; + public final Object data; + public final String encoding; + public final MessageExtras extras; + public final MessageOperation operation; + + /** + * Constructs an UpdateDeleteRequest from a Message and operation metadata. + * + * @param message The message containing the update/delete data + * @param operation The MessageOperation metadata + * @param channelOptions Channel options for encoding the message data + * @throws AblyException If encoding fails + */ + UpdateDeleteRequest(Message message, MessageOperation operation, ChannelOptions channelOptions) throws AblyException { + this.operation = operation; + this.name = message.name; + this.extras = message.extras; + + BaseMessage.EncodedMessageData encodedMessageData = message.encodeData(channelOptions); + this.data = encodedMessageData.data; + this.encoding = encodedMessageData.encoding; + } + + /** + * Writes this UpdateDeleteRequest to MessagePack format. + * + * @param packer The MessagePacker to write to + * @throws IOException If writing fails + */ + void writeMsgpack(MessagePacker packer) throws IOException { + int fieldCount = 0; + if (name != null) ++fieldCount; + if (data != null) ++fieldCount; + if (encoding != null) ++fieldCount; + if (extras != null) ++fieldCount; + if (operation != null) ++fieldCount; + + packer.packMapHeader(fieldCount); + + if (name != null) { + packer.packString(NAME); + packer.packString(name); + } + if (data != null) { + packer.packString(DATA); + if (data instanceof byte[]) { + byte[] byteData = (byte[])data; + packer.packBinaryHeader(byteData.length); + packer.writePayload(byteData); + } else { + packer.packString(data.toString()); + } + } + if (encoding != null) { + packer.packString(ENCODING); + packer.packString(encoding); + } + if (extras != null) { + packer.packString(EXTRAS); + extras.write(packer); + } + if (operation != null) { + packer.packString(OPERATION); + operation.writeMsgpack(packer); + } + } + + /** + * Base for gson serialisers. + */ + JsonObject asJsonObject() { + JsonObject json = new JsonObject(); + Object data = this.data; + String encoding = this.encoding; + if (data != null) { + if (data instanceof byte[]) { + byte[] dataBytes = (byte[])data; + json.addProperty(DATA, new String(Base64Coder.encode(dataBytes))); + encoding = (encoding == null) ? "base64" : encoding + "/base64"; + } else { + json.addProperty(DATA, data.toString()); + } + if (encoding != null) json.addProperty(ENCODING, encoding); + } + if (this.name != null) json.addProperty(NAME, this.name); + if (this.extras != null) json.add(EXTRAS, this.extras.asJsonObject()); + if (this.operation != null) json.add(OPERATION, this.operation.asJsonObject()); + return json; + } + } + + private static final String TAG = MessageOperationSerializer.class.getName(); +} diff --git a/lib/src/main/java/io/ably/lib/types/MessageSerializer.java b/lib/src/main/java/io/ably/lib/types/MessageSerializer.java index f8540bf1e..5995ddfdf 100644 --- a/lib/src/main/java/io/ably/lib/types/MessageSerializer.java +++ b/lib/src/main/java/io/ably/lib/types/MessageSerializer.java @@ -154,6 +154,10 @@ public static HttpCore.BodyHandler getMessageResponseHandler(ChannelOpt return opts == null ? messageResponseHandler : new MessageBodyHandler(opts); } + public static HttpCore.BodyHandler getSingleMessageResponseHandler(ChannelOptions opts) { + return new SingleMessageBodyHandler(opts); + } + private static class MessageBodyHandler implements HttpCore.BodyHandler { MessageBodyHandler(ChannelOptions opts) { this.opts = opts; } @@ -184,6 +188,41 @@ else if("application/x-msgpack".equals(contentType)) private ChannelOptions opts; } + private static class SingleMessageBodyHandler implements HttpCore.BodyHandler { + + private final ChannelOptions opts; + + SingleMessageBodyHandler(ChannelOptions opts) { this.opts = opts; } + + @Override + public Message[] handleResponseBody(String contentType, byte[] body) throws AblyException { + try { + Message message = null; + if ("application/json".equals(contentType)) { + message = Serialisation.gson.fromJson(new String(body), Message.class); + } else if ("application/x-msgpack".equals(contentType)) { + MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(body); + try { + message = new Message().readMsgpack(unpacker); + } catch (IOException ioe) { + throw AblyException.fromThrowable(ioe); + } + } + + if (message != null) { + try { + message.decode(opts); + } catch (MessageDecodeException e) { + Log.e(TAG, e.errorInfo.message); + } + } + return new Message[] { message }; + } catch (MessageDecodeException e) { + throw AblyException.fromThrowable(e); + } + } + } + private static HttpCore.BodyHandler messageResponseHandler = new MessageBodyHandler(null); private static final String TAG = MessageSerializer.class.getName(); } diff --git a/lib/src/test/java/io/ably/lib/test/rest/RestChannelMessageEditTest.java b/lib/src/test/java/io/ably/lib/test/rest/RestChannelMessageEditTest.java new file mode 100644 index 000000000..a15fbb3c8 --- /dev/null +++ b/lib/src/test/java/io/ably/lib/test/rest/RestChannelMessageEditTest.java @@ -0,0 +1,483 @@ +package io.ably.lib.test.rest; + +import io.ably.lib.network.EngineType; +import io.ably.lib.network.HttpEngineFactory; +import io.ably.lib.rest.AblyRest; +import io.ably.lib.rest.Channel; +import io.ably.lib.test.common.Helpers.CompletionSet; +import io.ably.lib.test.common.ParameterizedTest; +import io.ably.lib.types.AblyException; +import io.ably.lib.types.ChannelOptions; +import io.ably.lib.types.ClientOptions; +import io.ably.lib.types.ErrorInfo; +import io.ably.lib.types.Message; +import io.ably.lib.types.MessageAction; +import io.ably.lib.types.MessageOperation; +import io.ably.lib.types.PaginatedResult; +import io.ably.lib.types.Param; +import io.ably.lib.util.Crypto; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.util.HashMap; +import java.util.UUID; +import java.util.function.Predicate; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +/** + * Tests for REST channel message edit and delete operations + */ +public class RestChannelMessageEditTest extends ParameterizedTest { + + @Rule + public Timeout testTimeout = Timeout.seconds(300); + private AblyRest ably; + private EngineType engineType; + + @Before + public void setUpBefore() throws Exception { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + ably = new AblyRest(opts); + engineType = HttpEngineFactory.getFirstAvailable().getEngineType(); + } + + /** + * Test getMessage: Publish a message and retrieve it by serial + */ + @Test + public void getMessage_retrieveBySerial() throws Exception { + if (engineType == EngineType.DEFAULT) return; + + String channelName = "mutable:get_message_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Test message data"); + + // Get the message from history to obtain its serial + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Retrieve the message by serial + Message retrievedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + + // Verify the retrieved message + assertNotNull("Expected non-null retrieved message", retrievedMessage); + assertEquals("Expected same message name", publishedMessage.name, retrievedMessage.name); + assertEquals("Expected same message data", publishedMessage.data, retrievedMessage.data); + assertEquals("Expected same serial", publishedMessage.serial, retrievedMessage.serial); + } + + /** + * Test updateMessage: Update a message's data + */ + @Test + public void updateMessage_updateData() throws Exception { + if (engineType == EngineType.DEFAULT) return; + + String channelName = "mutable:update_message_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original message data"); + + // Get the message from history to obtain its serial + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message + Message updateMessage = new Message(); + updateMessage.serial = publishedMessage.serial; + updateMessage.data = "Updated message data"; + updateMessage.name = "updated_event"; + + channel.updateMessage(updateMessage); + + // Retrieve the updated message + Message updatedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + + // Verify the message was updated + assertNotNull("Expected non-null updated message", updatedMessage); + assertEquals("Expected updated message data", "Updated message data", updatedMessage.data); + assertEquals("Expected updated message name", "updated_event", updatedMessage.name); + assertEquals("Expected action to be MESSAGE_UPDATE", MessageAction.MESSAGE_UPDATE, updatedMessage.action); + } + + /** + * Test updateMessage: Update a message's data + */ + @Test + public void updateMessage_updateEncodedData() throws Exception { + if (engineType == EngineType.DEFAULT) return; + + String channelName = "mutable:update_encodedmessage_" + UUID.randomUUID() + "_" + testParams.name; + ChannelOptions channelOptions = ChannelOptions.withCipherKey(Crypto.generateRandomKey()); + Channel channel = ably.channels.get(channelName, channelOptions); + + // Publish a message + channel.publish("test_event", "Original message data"); + + // Get the message from history to obtain its serial + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message + Message updateMessage = new Message(); + updateMessage.serial = publishedMessage.serial; + updateMessage.data = "Updated message data"; + updateMessage.name = "updated_event"; + + channel.updateMessage(updateMessage); + + // Retrieve the updated message + Message updatedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + + // Verify the message was updated + assertNotNull("Expected non-null updated message", updatedMessage); + assertEquals("Expected updated message data", "Updated message data", updatedMessage.data); + assertEquals("Expected updated message name", "updated_event", updatedMessage.name); + assertEquals("Expected action to be MESSAGE_UPDATE", MessageAction.MESSAGE_UPDATE, updatedMessage.action); + } + + /** + * Test updateMessage async: Update a message using async API + */ + @Test + public void updateMessage_async() throws Exception { + if (engineType == EngineType.DEFAULT) return; + + String channelName = "mutable:update_message_async_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original message data"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + final Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message using async API + Message updateMessage = new Message(); + updateMessage.serial = publishedMessage.serial; + updateMessage.data = "Updated message data async"; + + CompletionSet updateComplete = new CompletionSet(); + channel.updateMessageAsync(updateMessage, updateComplete.add()); + + ErrorInfo[] updateErrors = updateComplete.waitFor(); + assertEquals("Expected no errors from update", 0, updateErrors.length); + + // Retrieve the updated message + Message updatedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + assertNotNull("Expected non-null updated message", updatedMessage); + assertEquals("Expected updated message data", "Updated message data async", updatedMessage.data); + } + + /** + * Test deleteMessage: Soft delete a message + */ + @Test + public void deleteMessage_softDelete() throws Exception { + String channelName = "mutable:delete_message_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Message to be deleted"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Delete the message + Message deleteMessage = new Message(); + deleteMessage.serial = publishedMessage.serial; + deleteMessage.data = "Message deleted"; + + channel.deleteMessage(deleteMessage); + + // Retrieve the deleted message + Message deletedMessage = waitForDeletedMessageAppear(channel, publishedMessage.serial); + + // Verify the message was soft deleted + assertNotNull("Expected non-null deleted message", deletedMessage); + assertEquals("Expected action to be MESSAGE_DELETE", MessageAction.MESSAGE_DELETE, deletedMessage.action); + } + + /** + * Test deleteMessage async: Delete a message using async API + */ + @Test + public void deleteMessage_async() throws Exception { + String channelName = "mutable:delete_message_async_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Message to be deleted async"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + final Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Delete the message using async API + Message deleteMessage = new Message(); + deleteMessage.serial = publishedMessage.serial; + deleteMessage.data = "Message deleted async"; + + CompletionSet deleteComplete = new CompletionSet(); + channel.deleteMessageAsync(deleteMessage, deleteComplete.add()); + + ErrorInfo[] deleteErrors = deleteComplete.waitFor(); + assertEquals("Expected no errors from delete", 0, deleteErrors.length); + + // Retrieve the deleted message + Message deletedMessage = waitForDeletedMessageAppear(channel, publishedMessage.serial); + assertNotNull("Expected non-null deleted message", deletedMessage); + assertEquals("Expected action to be MESSAGE_DELETE", MessageAction.MESSAGE_DELETE, deletedMessage.action); + } + + /** + * Test getMessageVersions: Retrieve version history of a message + */ + @Test + public void getMessageVersions_retrieveHistory() throws Exception { + if (engineType == EngineType.DEFAULT) return; + + String channelName = "mutable:message_versions_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original data"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message to create version history + Message updateMessage1 = new Message(); + updateMessage1.serial = publishedMessage.serial; + updateMessage1.data = "First update"; + channel.updateMessage(updateMessage1); + + Message updateMessage2 = new Message(); + updateMessage2.serial = publishedMessage.serial; + updateMessage2.data = "Second update"; + MessageOperation messageOperation = new MessageOperation(); + messageOperation.description = "description"; + messageOperation.metadata = new HashMap<>(); + messageOperation.metadata.put("key", "value"); + channel.updateMessage(updateMessage2, messageOperation); + + // Retrieve version history + PaginatedResult versions = waitForMessageAppearInVersionHistory(channel, publishedMessage.serial, null, msgs -> + msgs.length >= 3 + ); + + // Verify version history + assertNotNull("Expected non-null versions", versions); + assertTrue("Expected at least 3 versions (original + 2 updates)", versions.items().length >= 3); + + Message latestVersion = versions.items()[versions.items().length - 1]; + assertEquals("Expected latest version to have second update data", "Second update", latestVersion.data); + assertEquals("description", latestVersion.version.description); + assertEquals("value", latestVersion.version.metadata.get("key")); + } + + /** + * Test getMessageVersions async: Retrieve version history using async API + */ + @Test + public void getMessageVersions_async() throws Exception { + if (engineType == EngineType.DEFAULT) return; + + String channelName = "mutable:message_versions_async_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original data"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + final Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + } + + /** + * Test error handling: getMessage with invalid serial + */ + @Test + public void getMessage_invalidSerial() { + String channelName = "mutable:get_message_invalid_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + AblyException exception = assertThrows(AblyException.class, () -> { + channel.getMessage("invalid_serial_12345"); + }); + + assertNotNull("Expected error info", exception.errorInfo); + } + + /** + * Test error handling: updateMessage with null serial + */ + @Test + public void updateMessage_nullSerial() { + String channelName = "mutable:update_message_null_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + AblyException exception = assertThrows(AblyException.class, () -> { + Message updateMessage = new Message(); + updateMessage.serial = null; + updateMessage.data = "Update data"; + + channel.updateMessage(updateMessage); + }); + + assertNotNull("Expected error info", exception.errorInfo); + assertTrue("Expected error message about serial", + exception.errorInfo.message.toLowerCase().contains("serial")); + } + + /** + * Test error handling: deleteMessage with empty serial + */ + @Test + public void deleteMessage_emptySerial() { + String channelName = "mutable:delete_message_empty_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + AblyException exception = assertThrows(AblyException.class, () -> { + Message deleteMessage = new Message(); + deleteMessage.serial = ""; + deleteMessage.data = "Delete data"; + + channel.deleteMessage(deleteMessage); + }); + + assertNotNull("Expected error info", exception.errorInfo); + assertTrue("Expected error message about serial", + exception.errorInfo.message.toLowerCase().contains("serial")); + } + + /** + * Test complete workflow: publish, update, get versions, delete + */ + @Test + public void completeWorkflow_publishUpdateVersionsDelete() throws Exception { + if (engineType == EngineType.DEFAULT) return; + + String channelName = "mutable:complete_workflow_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // 1. Publish a message + channel.publish("workflow_event", "Initial data"); + + // Get the published message + PaginatedResult history = waitForMessageAppearInHistory(channel); + Message publishedMessage = history.items()[0]; + String serial = publishedMessage.serial; + + // 2. Update the message + Message updateMessage = new Message(); + updateMessage.serial = serial; + updateMessage.data = "Updated data"; + updateMessage.name = "workflow_event_updated"; + channel.updateMessage(updateMessage); + + // 3. Verify update + Message retrieved = waitForUpdatedMessageAppear(channel, serial); + assertEquals("Expected updated data", "Updated data", retrieved.data); + assertEquals("Expected MESSAGE_UPDATE action", MessageAction.MESSAGE_UPDATE, retrieved.action); + + // 4. Delete the message + Message deleteMessage = new Message(); + deleteMessage.serial = serial; + deleteMessage.data = "Deleted"; + channel.deleteMessage(deleteMessage); + + // 5. Verify deletion + Message deleted = waitForDeletedMessageAppear(channel, serial); + assertEquals("Expected MESSAGE_DELETE action", MessageAction.MESSAGE_DELETE, deleted.action); + + // 6. Verify delete appears in versions + PaginatedResult finalVersions = waitForMessageAppearInVersionHistory(channel, serial, null, msgs -> + msgs.length > 0 && msgs[msgs.length - 1].action == MessageAction.MESSAGE_DELETE + ); + assertTrue("Expected at least 3 versions (create, update, delete)", finalVersions.items().length >= 3); + } + + private PaginatedResult waitForMessageAppearInVersionHistory(Channel channel, String serial, Param[] params, Predicate predicate) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + PaginatedResult history = channel.getMessageVersions(serial, params); + if (history.items().length > 0 && predicate.test(history.items()) || System.currentTimeMillis() > timeout) + return history; + Thread.sleep(200); + } + } + + private PaginatedResult waitForMessageAppearInHistory(Channel channel) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + PaginatedResult history = channel.history(null); + if (history.items().length > 0 || System.currentTimeMillis() > timeout) return history; + Thread.sleep(200); + } + } + + private Message waitForUpdatedMessageAppear(Channel channel, String serial) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + Message message = channel.getMessage(serial); + if ((message != null && message.action == MessageAction.MESSAGE_UPDATE) || System.currentTimeMillis() > timeout) + return message; + Thread.sleep(200); + } + } + + private Message waitForDeletedMessageAppear(Channel channel, String serial) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + Message message = channel.getMessage(serial); + if ((message != null && message.action == MessageAction.MESSAGE_DELETE) || System.currentTimeMillis() > timeout) + return message; + Thread.sleep(200); + } + } +}