From 44c71e546b99442c292d6fa3d09632f7221805b9 Mon Sep 17 00:00:00 2001 From: evgeny Date: Thu, 18 Dec 2025 11:09:25 +0000 Subject: [PATCH] [AIT-99] feat: protocol v5 + message appends --- .../io/ably/lib/realtime/ChannelBase.java | 101 +++++++--- .../java/io/ably/lib/rest/ChannelBase.java | 101 +++++++--- .../io/ably/lib/rest/MessageEditsMixin.java | 103 +++++++---- .../java/io/ably/lib/transport/Defaults.java | 2 +- .../java/io/ably/lib/types/MessageAction.java | 3 +- .../lib/types/MessageOperationSerializer.java | 172 ------------------ .../io/ably/lib/types/MessageSerializer.java | 18 +- .../io/ably/lib/types/UpdateDeleteResult.java | 71 ++++++++ .../test/realtime/RealtimeHttpHeaderTest.java | 2 +- .../io/ably/lib/test/rest/HttpHeaderTest.java | 2 +- 10 files changed, 309 insertions(+), 266 deletions(-) delete mode 100644 lib/src/main/java/io/ably/lib/types/MessageOperationSerializer.java create mode 100644 lib/src/main/java/io/ably/lib/types/UpdateDeleteResult.java diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index b43bf363c..ded2c967d 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -43,6 +43,7 @@ import io.ably.lib.types.ProtocolMessage.Action; import io.ably.lib.types.ProtocolMessage.Flag; import io.ably.lib.types.Summary; +import io.ably.lib.types.UpdateDeleteResult; import io.ably.lib.util.CollectionUtils; import io.ably.lib.util.EventEmitter; import io.ably.lib.util.Log; @@ -1214,9 +1215,10 @@ public void getMessageAsync(String serial, Callback callback) { * Only non-null fields will be applied to the existing message. * @param operation operation metadata such as clientId, description, or metadata in the version field * @throws AblyException If the update operation fails. + * @return A {@link UpdateDeleteResult} containing the updated message version serial. */ - public void updateMessage(Message message, MessageOperation operation) throws AblyException { - messageEditsMixin.updateMessage(ably.http, message, operation); + public UpdateDeleteResult updateMessage(Message message, MessageOperation operation) throws AblyException { + return messageEditsMixin.updateMessage(ably.http, message, operation); } /** @@ -1228,9 +1230,10 @@ public void updateMessage(Message message, MessageOperation operation) throws Ab * @param message A {@link Message} object containing the fields to update and the serial identifier. * Only non-null fields will be applied to the existing message. * @throws AblyException If the update operation fails. + * @return A {@link UpdateDeleteResult} containing the updated message version serial. */ - public void updateMessage(Message message) throws AblyException { - updateMessage(message, null); + public UpdateDeleteResult updateMessage(Message message) throws AblyException { + return updateMessage(message, null); } /** @@ -1238,24 +1241,24 @@ public void updateMessage(Message message) throws AblyException { * * @param message A {@link Message} object containing the fields to update and the serial identifier. * @param operation operation metadata such as clientId, description, or metadata in the version field - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void updateMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { - messageEditsMixin.updateMessageAsync(ably.http, message, operation, listener); + public void updateMessageAsync(Message message, MessageOperation operation, Callback callback) { + messageEditsMixin.updateMessageAsync(ably.http, message, operation, callback); } /** * Asynchronously updates an existing message. * * @param message A {@link Message} object containing the fields to update and the serial identifier. - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void updateMessageAsync(Message message, CompletionListener listener) { - updateMessageAsync(message, null, listener); + public void updateMessageAsync(Message message, Callback callback) { + updateMessageAsync(message, null, callback); } /** @@ -1268,9 +1271,10 @@ public void updateMessageAsync(Message message, CompletionListener listener) { * @param message A {@link Message} message containing the serial identifier. * @param operation operation metadata such as clientId, description, or metadata in the version field * @throws AblyException If the delete operation fails. + * @return A {@link UpdateDeleteResult} containing the deleted message version serial. */ - public void deleteMessage(Message message, MessageOperation operation) throws AblyException { - messageEditsMixin.deleteMessage(ably.http, message, operation); + public UpdateDeleteResult deleteMessage(Message message, MessageOperation operation) throws AblyException { + return messageEditsMixin.deleteMessage(ably.http, message, operation); } /** @@ -1282,9 +1286,10 @@ public void deleteMessage(Message message, MessageOperation operation) throws Ab * * @param message A {@link Message} message containing the serial identifier. * @throws AblyException If the delete operation fails. + * @return A {@link UpdateDeleteResult} containing the deleted message version serial. */ - public void deleteMessage(Message message) throws AblyException { - deleteMessage(message, null); + public UpdateDeleteResult deleteMessage(Message message) throws AblyException { + return deleteMessage(message, null); } /** @@ -1292,24 +1297,72 @@ public void deleteMessage(Message message) throws AblyException { * * @param message A {@link Message} object containing the serial identifier and operation metadata. * @param operation operation metadata such as clientId, description, or metadata in the version field - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void deleteMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { - messageEditsMixin.deleteMessageAsync(ably.http, message, operation, listener); + public void deleteMessageAsync(Message message, MessageOperation operation, Callback callback) { + messageEditsMixin.deleteMessageAsync(ably.http, message, operation, callback); } /** * Asynchronously marks a message as deleted. * * @param message A {@link Message} object containing the serial identifier and operation metadata. - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. + */ + public void deleteMessageAsync(Message message, Callback callback) { + deleteMessageAsync(message, null, callback); + } + + /** + * Appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + * @return A {@link UpdateDeleteResult} containing the updated message version serial. + * @throws AblyException If the append operation fails. + */ + public UpdateDeleteResult appendMessage(Message message, MessageOperation operation) throws AblyException { + return messageEditsMixin.appendMessage(ably.http, message, operation); + } + + /** + * Appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @return A {@link UpdateDeleteResult} containing the updated message version serial. + * @throws AblyException If the append operation fails. + */ + public UpdateDeleteResult appendMessage(Message message) throws AblyException { + return appendMessage(message, null); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + * @param callback A callback to be notified of the outcome of this operation. + *

+ * This callback is invoked on a background thread. + */ + public void appendMessageAsync(Message message, MessageOperation operation, Callback callback) { + messageEditsMixin.appendMessageAsync(ably.http, message, operation, callback); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param callback A callback to be notified of the outcome of this operation. + *

+ * This callback is invoked on a background thread. */ - public void deleteMessageAsync(Message message, CompletionListener listener) { - deleteMessageAsync(message, null, listener); + public void appendMessageAsync(Message message, Callback callback) { + appendMessageAsync(message, null, callback); } /** diff --git a/lib/src/main/java/io/ably/lib/rest/ChannelBase.java b/lib/src/main/java/io/ably/lib/rest/ChannelBase.java index 47ec7bb4c..11e46751f 100644 --- a/lib/src/main/java/io/ably/lib/rest/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/rest/ChannelBase.java @@ -17,6 +17,7 @@ import io.ably.lib.types.Param; import io.ably.lib.types.PresenceMessage; import io.ably.lib.types.PresenceSerializer; +import io.ably.lib.types.UpdateDeleteResult; import io.ably.lib.util.Crypto; /** @@ -352,9 +353,10 @@ public void getMessageAsync(String serial, Callback callback) { * Only non-null fields will be applied to the existing message. * @param operation operation metadata such as clientId, description, or metadata in the version field * @throws AblyException If the update operation fails. + * @return A {@link UpdateDeleteResult} containing the updated message version serial. */ - public void updateMessage(Message message, MessageOperation operation) throws AblyException { - messageEditsMixin.updateMessage(ably.http, message, operation); + public UpdateDeleteResult updateMessage(Message message, MessageOperation operation) throws AblyException { + return messageEditsMixin.updateMessage(ably.http, message, operation); } /** @@ -366,9 +368,10 @@ public void updateMessage(Message message, MessageOperation operation) throws Ab * @param message A {@link Message} object containing the fields to update and the serial identifier. * Only non-null fields will be applied to the existing message. * @throws AblyException If the update operation fails. + * @return A {@link UpdateDeleteResult} containing the updated message version serial. */ - public void updateMessage(Message message) throws AblyException { - updateMessage(message, null); + public UpdateDeleteResult updateMessage(Message message) throws AblyException { + return updateMessage(message, null); } /** @@ -376,24 +379,24 @@ public void updateMessage(Message message) throws AblyException { * * @param message A {@link Message} object containing the fields to update and the serial identifier. * @param operation operation metadata such as clientId, description, or metadata in the version field - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void updateMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { - messageEditsMixin.updateMessageAsync(ably.http, message, operation, listener); + public void updateMessageAsync(Message message, MessageOperation operation, Callback callback) { + messageEditsMixin.updateMessageAsync(ably.http, message, operation, callback); } /** * Asynchronously updates an existing message. * * @param message A {@link Message} object containing the fields to update and the serial identifier. - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void updateMessageAsync(Message message, CompletionListener listener) { - updateMessageAsync(message, null, listener); + public void updateMessageAsync(Message message, Callback callback) { + updateMessageAsync(message, null, callback); } /** @@ -406,9 +409,10 @@ public void updateMessageAsync(Message message, CompletionListener listener) { * @param message A {@link Message} message containing the serial identifier. * @param operation operation metadata such as clientId, description, or metadata in the version field * @throws AblyException If the delete operation fails. + * @return A {@link UpdateDeleteResult} containing the deleted message version serial. */ - public void deleteMessage(Message message, MessageOperation operation) throws AblyException { - messageEditsMixin.deleteMessage(ably.http, message, operation); + public UpdateDeleteResult deleteMessage(Message message, MessageOperation operation) throws AblyException { + return messageEditsMixin.deleteMessage(ably.http, message, operation); } /** @@ -420,9 +424,10 @@ public void deleteMessage(Message message, MessageOperation operation) throws Ab * * @param message A {@link Message} message containing the serial identifier. * @throws AblyException If the delete operation fails. + * @return A {@link UpdateDeleteResult} containing the deleted message version serial. */ - public void deleteMessage(Message message) throws AblyException { - deleteMessage(message, null); + public UpdateDeleteResult deleteMessage(Message message) throws AblyException { + return deleteMessage(message, null); } /** @@ -430,24 +435,72 @@ public void deleteMessage(Message message) throws AblyException { * * @param message A {@link Message} object containing the serial identifier and operation metadata. * @param operation operation metadata such as clientId, description, or metadata in the version field - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void deleteMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { - messageEditsMixin.deleteMessageAsync(ably.http, message, operation, listener); + public void deleteMessageAsync(Message message, MessageOperation operation, Callback callback) { + messageEditsMixin.deleteMessageAsync(ably.http, message, operation, callback); } /** * Asynchronously marks a message as deleted. * * @param message A {@link Message} object containing the serial identifier and operation metadata. - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. + */ + public void deleteMessageAsync(Message message, Callback callback) { + deleteMessageAsync(message, null, callback); + } + + /** + * Appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + * @return A {@link UpdateDeleteResult} containing the updated message version serial. + * @throws AblyException If the append operation fails. + */ + public UpdateDeleteResult appendMessage(Message message, MessageOperation operation) throws AblyException { + return messageEditsMixin.appendMessage(ably.http, message, operation); + } + + /** + * Appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @return A {@link UpdateDeleteResult} containing the updated message version serial. + * @throws AblyException If the append operation fails. + */ + public UpdateDeleteResult appendMessage(Message message) throws AblyException { + return appendMessage(message, null); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + * @param callback A callback to be notified of the outcome of this operation. + *

+ * This callback is invoked on a background thread. + */ + public void appendMessageAsync(Message message, MessageOperation operation, Callback callback) { + messageEditsMixin.appendMessageAsync(ably.http, message, operation, callback); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param callback A callback to be notified of the outcome of this operation. + *

+ * This callback is invoked on a background thread. */ - public void deleteMessageAsync(Message message, CompletionListener listener) { - deleteMessageAsync(message, null, listener); + public void appendMessageAsync(Message message, Callback callback) { + appendMessageAsync(message, null, callback); } /** diff --git a/lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java b/lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java index d08400f10..df0e72beb 100644 --- a/lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java +++ b/lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java @@ -4,7 +4,6 @@ import io.ably.lib.http.Http; import io.ably.lib.http.HttpCore; import io.ably.lib.http.HttpUtils; -import io.ably.lib.realtime.CompletionListener; import io.ably.lib.types.AblyException; import io.ably.lib.types.AsyncPaginatedResult; import io.ably.lib.types.Callback; @@ -12,11 +11,13 @@ import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.Message; +import io.ably.lib.types.MessageAction; import io.ably.lib.types.MessageOperation; -import io.ably.lib.types.MessageOperationSerializer; import io.ably.lib.types.MessageSerializer; +import io.ably.lib.types.MessageVersion; import io.ably.lib.types.PaginatedResult; import io.ably.lib.types.Param; +import io.ably.lib.types.UpdateDeleteResult; import io.ably.lib.util.Crypto; public class MessageEditsMixin { @@ -92,44 +93,23 @@ private Http.Request getMessageImpl(Http http, String serial) { * @param message A {@link Message} object containing the fields to update and the serial identifier. * Only non-null fields will be applied to the existing message. * @throws AblyException If the update operation fails. + * + * @return A {@link UpdateDeleteResult} containing the updated message version serial. */ - public void updateMessage(Http http, Message message, MessageOperation operation) throws AblyException { - updateMessageImpl(http, message, operation).sync(); + public UpdateDeleteResult updateMessage(Http http, Message message, MessageOperation operation) throws AblyException { + return updateMessageImpl(http, message, operation, MessageAction.MESSAGE_UPDATE).sync(); } /** * Asynchronously updates an existing message. * * @param message A {@link Message} object containing the fields to update and the serial identifier. - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A listener to be notified of the outcome of this operation. *

* This listener is invoked on a background thread. */ - public void updateMessageAsync(Http http, Message message, MessageOperation operation, CompletionListener listener) { - updateMessageImpl(http, message, operation).async(new CompletionListener.ToCallback(listener)); - } - - private Http.Request updateMessageImpl(Http http, Message message, MessageOperation operation) { - return http.request((scheduler, callback) -> { - if (message.serial == null || message.serial.isEmpty()) { - throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003)); - } - /* RTL6g3 */ - auth.checkClientId(message, true, false); - - HttpCore.RequestBody requestBody = clientOptions.useBinaryProtocol - ? MessageOperationSerializer.asMsgPackRequest(message, operation, channelOptions) - : MessageOperationSerializer.asJsonRequest(message, operation, channelOptions); - final Param[] params = clientOptions.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; - - scheduler.patch(basePath + "/messages/" + HttpUtils.encodeURIComponent(message.serial), - HttpUtils.defaultAcceptHeaders(clientOptions.useBinaryProtocol), - params, - requestBody, - null, - true, - callback); - }); + public void updateMessageAsync(Http http, Message message, MessageOperation operation, Callback callback) { + updateMessageImpl(http, message, operation, MessageAction.MESSAGE_UPDATE).async(callback); } /** @@ -141,24 +121,50 @@ private Http.Request updateMessageImpl(Http http, Message message, Message * * @param message A {@link Message} message containing the serial identifier. * @throws AblyException If the delete operation fails. + * + * @return A {@link UpdateDeleteResult} containing the deleted message version serial. */ - public void deleteMessage(Http http, Message message, MessageOperation operation) throws AblyException { - deleteMessageImpl(http, message, operation).sync(); + public UpdateDeleteResult deleteMessage(Http http, Message message, MessageOperation operation) throws AblyException { + return updateMessageImpl(http, message, operation, MessageAction.MESSAGE_DELETE).sync(); } /** * Asynchronously marks a message as deleted. * * @param message A {@link Message} object containing the serial identifier and operation metadata. - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void deleteMessageAsync(Http http, Message message, MessageOperation operation, Callback callback) { + updateMessageImpl(http, message, operation, MessageAction.MESSAGE_DELETE).async(callback); + } + + /** + * Appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + * @return A {@link UpdateDeleteResult} containing the updated message version serial. + */ + public UpdateDeleteResult appendMessage(Http http, Message message, MessageOperation operation) throws AblyException { + return updateMessageImpl(http, message, operation, MessageAction.MESSAGE_APPEND).sync(); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + * @param callback A listener to be notified of the outcome of this operation. *

* This listener is invoked on a background thread. */ - public void deleteMessageAsync(Http http, Message message, MessageOperation operation, CompletionListener listener) { - deleteMessageImpl(http, message, operation).async(new CompletionListener.ToCallback(listener)); + public void appendMessageAsync(Http http, Message message, MessageOperation operation, Callback callback) { + updateMessageImpl(http, message, operation, MessageAction.MESSAGE_APPEND).async(callback); } - private Http.Request deleteMessageImpl(Http http, Message message, MessageOperation operation) { + private Http.Request updateMessageImpl(Http http, Message message, MessageOperation operation, MessageAction action) { return http.request((scheduler, callback) -> { if (message.serial == null || message.serial.isEmpty()) { throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003)); @@ -166,16 +172,33 @@ private Http.Request deleteMessageImpl(Http http, Message message, Message /* RTL6g3 */ auth.checkClientId(message, true, false); + Message updatedMessage = new Message(message.name, message.data, message.extras); + updatedMessage.action = action; + updatedMessage.version = new MessageVersion(); + if (operation != null) { + updatedMessage.version.clientId = operation.clientId; + updatedMessage.version.description = operation.description; + updatedMessage.version.metadata = operation.metadata; + } + updatedMessage.encode(channelOptions); + HttpCore.RequestBody requestBody = clientOptions.useBinaryProtocol - ? MessageOperationSerializer.asMsgPackRequest(message, operation, channelOptions) - : MessageOperationSerializer.asJsonRequest(message, operation, channelOptions); + ? MessageSerializer.asSingleMsgpackRequest(updatedMessage) + : MessageSerializer.asSingleJsonRequest(updatedMessage); final Param[] params = clientOptions.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; - scheduler.post(basePath + "/messages/" + HttpUtils.encodeURIComponent(message.serial) + "/delete", + HttpCore.BodyHandler bodyHandler = UpdateDeleteResult.getBodyHandler(); + + scheduler.patch(basePath + "/messages/" + HttpUtils.encodeURIComponent(message.serial), HttpUtils.defaultAcceptHeaders(clientOptions.useBinaryProtocol), params, requestBody, - null, + (response, error) -> { + if (error != null) throw AblyException.fromErrorInfo(error); + UpdateDeleteResult[] results = bodyHandler.handleResponseBody(response.contentType, response.body); + if (results != null && results.length > 0) return results[0]; + throw AblyException.fromErrorInfo(new ErrorInfo("No versionSerial in the update message response", 500, 50000)); + }, true, callback); }); diff --git a/lib/src/main/java/io/ably/lib/transport/Defaults.java b/lib/src/main/java/io/ably/lib/transport/Defaults.java index d55bce53e..83d5d83e4 100644 --- a/lib/src/main/java/io/ably/lib/transport/Defaults.java +++ b/lib/src/main/java/io/ably/lib/transport/Defaults.java @@ -12,7 +12,7 @@ public class Defaults { * spec: G4 *

*/ - public static final String ABLY_PROTOCOL_VERSION = "4"; + public static final String ABLY_PROTOCOL_VERSION = "5"; public static final String ABLY_AGENT_VERSION = String.format("%s/%s", "ably-java", BuildConfig.VERSION); diff --git a/lib/src/main/java/io/ably/lib/types/MessageAction.java b/lib/src/main/java/io/ably/lib/types/MessageAction.java index d80f3624f..20b580233 100644 --- a/lib/src/main/java/io/ably/lib/types/MessageAction.java +++ b/lib/src/main/java/io/ably/lib/types/MessageAction.java @@ -5,7 +5,8 @@ public enum MessageAction { MESSAGE_UPDATE, // 1 MESSAGE_DELETE, // 2 META, // 3 - MESSAGE_SUMMARY; // 4 + MESSAGE_SUMMARY, // 4 + MESSAGE_APPEND; // 5 static MessageAction tryFindByOrdinal(int ordinal) { return values().length <= ordinal ? null: values()[ordinal]; diff --git a/lib/src/main/java/io/ably/lib/types/MessageOperationSerializer.java b/lib/src/main/java/io/ably/lib/types/MessageOperationSerializer.java deleted file mode 100644 index 0dded32cf..000000000 --- a/lib/src/main/java/io/ably/lib/types/MessageOperationSerializer.java +++ /dev/null @@ -1,172 +0,0 @@ -package io.ably.lib.types; - -import com.google.gson.JsonObject; -import io.ably.lib.http.HttpCore; -import io.ably.lib.http.HttpUtils; -import io.ably.lib.util.Base64Coder; -import io.ably.lib.util.Log; -import io.ably.lib.util.Serialisation; -import org.msgpack.core.MessagePacker; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -/** - * MessageOperationSerializer: internal - * Utility class to serialize message update/delete requests in different formats. - */ -public class MessageOperationSerializer { - - /** - * Creates a JSON request body for a message update/delete operation. - * - * @param message The message containing the update/delete data - * @param operation The MessageOperation metadata - * @param channelOptions Channel options for encoding - * @return HttpCore.RequestBody for the request - * @throws AblyException If encoding fails - */ - public static HttpCore.RequestBody asJsonRequest(Message message, MessageOperation operation, ChannelOptions channelOptions) throws AblyException { - UpdateDeleteRequest request = new UpdateDeleteRequest(message, operation, channelOptions); - return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(request.asJsonObject())); - } - - /** - * Creates a MessagePack request body for a message update/delete operation. - * - * @param message The message containing the update/delete data - * @param operation The MessageOperation metadata - * @param channelOptions Channel options for encoding - * @return HttpCore.RequestBody for the request - * @throws AblyException If encoding fails - */ - public static HttpCore.RequestBody asMsgPackRequest(Message message, MessageOperation operation, ChannelOptions channelOptions) throws AblyException { - UpdateDeleteRequest request = new UpdateDeleteRequest(message, operation, channelOptions); - byte[] packed = writeMsgpack(request); - return new HttpUtils.ByteArrayRequestBody(packed, "application/x-msgpack"); - } - - /** - * Serializes an UpdateDeleteRequest to MessagePack format. - * - * @param request The request to serialize - * @return byte array containing the MessagePack data - */ - private static byte[] writeMsgpack(UpdateDeleteRequest request) { - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - MessagePacker packer = Serialisation.msgpackPackerConfig.newPacker(out); - request.writeMsgpack(packer); - packer.flush(); - return out.toByteArray(); - } catch (IOException e) { - Log.e(TAG, "Failed to write msgpack", e); - return null; - } - } - - /** - * Represents a request to update or delete a message. - * Contains the message data and operation metadata. - */ - static class UpdateDeleteRequest { - private static final String NAME = "name"; - private static final String DATA = "data"; - private static final String ENCODING = "encoding"; - private static final String EXTRAS = "extras"; - private static final String OPERATION = "operation"; - - public final String name; - public final Object data; - public final String encoding; - public final MessageExtras extras; - public final MessageOperation operation; - - /** - * Constructs an UpdateDeleteRequest from a Message and operation metadata. - * - * @param message The message containing the update/delete data - * @param operation The MessageOperation metadata - * @param channelOptions Channel options for encoding the message data - * @throws AblyException If encoding fails - */ - UpdateDeleteRequest(Message message, MessageOperation operation, ChannelOptions channelOptions) throws AblyException { - this.operation = operation; - this.name = message.name; - this.extras = message.extras; - - BaseMessage.EncodedMessageData encodedMessageData = message.encodeData(channelOptions); - this.data = encodedMessageData.data; - this.encoding = encodedMessageData.encoding; - } - - /** - * Writes this UpdateDeleteRequest to MessagePack format. - * - * @param packer The MessagePacker to write to - * @throws IOException If writing fails - */ - void writeMsgpack(MessagePacker packer) throws IOException { - int fieldCount = 0; - if (name != null) ++fieldCount; - if (data != null) ++fieldCount; - if (encoding != null) ++fieldCount; - if (extras != null) ++fieldCount; - if (operation != null) ++fieldCount; - - packer.packMapHeader(fieldCount); - - if (name != null) { - packer.packString(NAME); - packer.packString(name); - } - if (data != null) { - packer.packString(DATA); - if (data instanceof byte[]) { - byte[] byteData = (byte[])data; - packer.packBinaryHeader(byteData.length); - packer.writePayload(byteData); - } else { - packer.packString(data.toString()); - } - } - if (encoding != null) { - packer.packString(ENCODING); - packer.packString(encoding); - } - if (extras != null) { - packer.packString(EXTRAS); - extras.write(packer); - } - if (operation != null) { - packer.packString(OPERATION); - operation.writeMsgpack(packer); - } - } - - /** - * Base for gson serialisers. - */ - JsonObject asJsonObject() { - JsonObject json = new JsonObject(); - Object data = this.data; - String encoding = this.encoding; - if (data != null) { - if (data instanceof byte[]) { - byte[] dataBytes = (byte[])data; - json.addProperty(DATA, new String(Base64Coder.encode(dataBytes))); - encoding = (encoding == null) ? "base64" : encoding + "/base64"; - } else { - json.addProperty(DATA, data.toString()); - } - if (encoding != null) json.addProperty(ENCODING, encoding); - } - if (this.name != null) json.addProperty(NAME, this.name); - if (this.extras != null) json.add(EXTRAS, this.extras.asJsonObject()); - if (this.operation != null) json.add(OPERATION, this.operation.asJsonObject()); - return json; - } - } - - private static final String TAG = MessageOperationSerializer.class.getName(); -} diff --git a/lib/src/main/java/io/ably/lib/types/MessageSerializer.java b/lib/src/main/java/io/ably/lib/types/MessageSerializer.java index 5995ddfdf..b53c98110 100644 --- a/lib/src/main/java/io/ably/lib/types/MessageSerializer.java +++ b/lib/src/main/java/io/ably/lib/types/MessageSerializer.java @@ -47,8 +47,8 @@ public static Message[] readMsgpack(byte[] packed) throws AblyException { * Msgpack encode ****************************************/ - public static HttpCore.RequestBody asMsgpackRequest(Message message) throws AblyException { - return asMsgpackRequest(new Message[] { message }); + public static HttpCore.RequestBody asSingleMsgpackRequest(Message message) throws AblyException { + return new HttpUtils.ByteArrayRequestBody(write(message), "application/x-msgpack"); } public static HttpCore.RequestBody asMsgpackRequest(Message[] messages) { @@ -74,6 +74,16 @@ public static void writeMsgpackArray(Message[] messages, MessagePacker packer) { } catch(IOException e) {} } + public static byte[] write(Message message) { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + MessagePacker packer = Serialisation.msgpackPackerConfig.newPacker(out); + message.writeMsgpack(packer); + packer.flush(); + return out.toByteArray(); + } catch(IOException e) { return null; } + } + public static void write(final Map map, final MessagePacker packer) throws IOException { packer.packMapHeader(map.size()); for (final Map.Entry entry : map.entrySet()) { @@ -138,6 +148,10 @@ public static HttpCore.RequestBody asJsonRequest(Message message) throws AblyExc return asJsonRequest(new Message[] { message }); } + public static HttpCore.RequestBody asSingleJsonRequest(Message message) { + return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(message)); + } + public static HttpCore.RequestBody asJsonRequest(Message[] messages) { return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(messages)); } diff --git a/lib/src/main/java/io/ably/lib/types/UpdateDeleteResult.java b/lib/src/main/java/io/ably/lib/types/UpdateDeleteResult.java new file mode 100644 index 000000000..567720471 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/types/UpdateDeleteResult.java @@ -0,0 +1,71 @@ +package io.ably.lib.types; + +import io.ably.lib.http.HttpCore; +import io.ably.lib.util.Serialisation; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessageUnpacker; + +import java.io.IOException; + +public class UpdateDeleteResult { + + private static final String VERSION_SERIAL = "versionSerial"; + + public final String versionSerial; + + public UpdateDeleteResult(String versionSerial) { + this.versionSerial = versionSerial; + } + + public static UpdateDeleteResult readFromJson(byte[] packed) throws MessageDecodeException { + return Serialisation.gson.fromJson(new String(packed), UpdateDeleteResult.class); + } + + public static UpdateDeleteResult readMsgpack(byte[] packed) throws AblyException { + try { + MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(packed); + return readMsgpack(unpacker); + } catch(IOException ioe) { + throw AblyException.fromThrowable(ioe); + } + } + + public static UpdateDeleteResult readMsgpack(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + String versionSerial = null; + for(int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString().intern(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if(fieldFormat.equals(MessageFormat.NIL)) { + unpacker.unpackNil(); + continue; + } + + if(fieldName.equals(VERSION_SERIAL)) { + versionSerial = unpacker.unpackString(); + } + } + return new UpdateDeleteResult(versionSerial); + } + + public static HttpCore.BodyHandler getBodyHandler() { + return new UpdateDeleteResultBodyHandler(); + } + + private static class UpdateDeleteResultBodyHandler implements HttpCore.BodyHandler { + + @Override + public UpdateDeleteResult[] handleResponseBody(String contentType, byte[] body) throws AblyException { + try { + UpdateDeleteResult updateDeleteResult = null; + if("application/json".equals(contentType)) + updateDeleteResult = readFromJson(body); + else if("application/x-msgpack".equals(contentType)) + updateDeleteResult = readMsgpack(body); + return new UpdateDeleteResult[] { updateDeleteResult }; + } catch (MessageDecodeException e) { + throw AblyException.fromThrowable(e); + } + } + } +} diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java index 940aa3dd2..172f2a545 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java @@ -81,7 +81,7 @@ public void realtime_websocket_param_test() { * Defaults.ABLY_VERSION_PARAM, as ultimately the request param has been derived from those values. */ assertEquals("Verify correct version", requestParameters.get("v"), - Collections.singletonList("4")); + Collections.singletonList("5")); /* Spec RSC7d3 * This test should not directly validate version against Defaults.ABLY_AGENT_VERSION, nor diff --git a/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java b/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java index a07b50d9c..f26fda4eb 100644 --- a/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java +++ b/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java @@ -81,7 +81,7 @@ public void header_lib_channel_publish() { * from those values. */ Assert.assertNotNull("Expected headers", headers); - Assert.assertEquals(headers.get("x-ably-version"), "4"); + Assert.assertEquals(headers.get("x-ably-version"), "5"); Assert.assertEquals(headers.get("ably-agent"), expectedAblyAgentHeader); // RSA7e2 Assert.assertNull("Shouldn't include 'x-ably-clientid' if `clientId` is not specified", headers.get("x-ably-clientid"));