From 55da34648590b97fccfcfb85fc6f9f79cc68c9d7 Mon Sep 17 00:00:00 2001 From: evgeny Date: Tue, 16 Sep 2025 13:28:01 +0100 Subject: [PATCH 1/2] feat: implement protocol v4 support for enhanced message annotations and versioning Introduce `MessageAnnotations` and `MessageVersion` classes for protocol v4. Replace `summary` with `annotations` in `Message`. Enhance version tracking with detailed metadata, ensuring compatibility with the new protocol. Update tests and protocol version to `4`. --- .../main/java/io/ably/lib/http/HttpCore.java | 6 +- .../io/ably/lib/realtime/ChannelBase.java | 18 +- .../main/java/io/ably/lib/rest/AblyBase.java | 33 ++- .../java/io/ably/lib/transport/Defaults.java | 2 +- .../main/java/io/ably/lib/types/Message.java | 211 ++-------------- .../io/ably/lib/types/MessageAnnotations.java | 134 ++++++++++ .../io/ably/lib/types/MessageVersion.java | 235 ++++++++++++++++++ .../main/java/io/ably/lib/types/Summary.java | 2 +- .../java/io/ably/lib/util/Serialisation.java | 4 + .../io/ably/lib/chat/ChatMessagesTest.java | 48 ++-- .../test/realtime/RealtimeHttpHeaderTest.java | 2 +- .../test/realtime/RealtimeMessageTest.java | 66 ++++- .../io/ably/lib/test/rest/HttpHeaderTest.java | 2 +- .../ably/lib/test/rest/RestRequestTest.java | 18 +- .../io/ably/lib/transport/DefaultsTest.java | 2 +- .../java/io/ably/lib/types/MessageTest.java | 74 +++--- 16 files changed, 570 insertions(+), 287 deletions(-) create mode 100644 lib/src/main/java/io/ably/lib/types/MessageAnnotations.java create mode 100644 lib/src/main/java/io/ably/lib/types/MessageVersion.java diff --git a/lib/src/main/java/io/ably/lib/http/HttpCore.java b/lib/src/main/java/io/ably/lib/http/HttpCore.java index f3e4cf46b..a6e102404 100644 --- a/lib/src/main/java/io/ably/lib/http/HttpCore.java +++ b/lib/src/main/java/io/ably/lib/http/HttpCore.java @@ -333,8 +333,10 @@ private Map collectRequestHeaders(URL url, String method, Param[ requestHeaders.put(HttpConstants.Headers.ACCEPT, HttpConstants.ContentTypes.JSON); } - /* pass required headers */ - requestHeaders.put(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7a + if (!requestHeaders.containsKey(Defaults.ABLY_PROTOCOL_VERSION_HEADER)) { + requestHeaders.put(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7e + } + Map additionalAgents = new HashMap<>(); if (options.agents != null) additionalAgents.putAll(options.agents); if (dynamicAgents != null) additionalAgents.putAll(dynamicAgents); diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index b5d5c9938..d12114fcb 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -29,15 +29,17 @@ import io.ably.lib.types.DeltaExtras; import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.Message; -import io.ably.lib.types.MessageAction; +import io.ably.lib.types.MessageAnnotations; import io.ably.lib.types.MessageDecodeException; import io.ably.lib.types.MessageSerializer; +import io.ably.lib.types.MessageVersion; import io.ably.lib.types.PaginatedResult; import io.ably.lib.types.Param; import io.ably.lib.types.PresenceMessage; import io.ably.lib.types.ProtocolMessage; import io.ably.lib.types.ProtocolMessage.Action; import io.ably.lib.types.ProtocolMessage.Flag; +import io.ably.lib.types.Summary; import io.ably.lib.util.CollectionUtils; import io.ably.lib.util.EventEmitter; import io.ably.lib.util.Log; @@ -901,10 +903,16 @@ private void onMessage(final ProtocolMessage protocolMessage) { if(msg.connectionId == null) msg.connectionId = protocolMessage.connectionId; if(msg.timestamp == 0) msg.timestamp = protocolMessage.timestamp; if(msg.id == null) msg.id = protocolMessage.id + ':' + i; - // (TM2k) - if(msg.serial == null && msg.version != null && msg.action == MessageAction.MESSAGE_CREATE) msg.serial = msg.version; - // (TM2o) - if(msg.createdAt == null && msg.action == MessageAction.MESSAGE_CREATE) msg.createdAt = msg.timestamp; + // (TM2s) + if(msg.version == null) msg.version = new MessageVersion(msg.serial, msg.timestamp); + // (TM2s1) + if(msg.version.serial == null) msg.version.serial = msg.serial; + // (TM2s2) + if(msg.version.timestamp == 0) msg.version.timestamp = msg.timestamp; + // (TM2u) + if(msg.annotations == null) msg.annotations = new MessageAnnotations(); + // (TM8a) + if(msg.annotations.summary == null) msg.annotations.summary = new Summary(new HashMap<>()); try { if (msg.data != null) msg.decode(options, decodingContext); diff --git a/lib/src/main/java/io/ably/lib/rest/AblyBase.java b/lib/src/main/java/io/ably/lib/rest/AblyBase.java index 8782b9290..8d6560ca6 100644 --- a/lib/src/main/java/io/ably/lib/rest/AblyBase.java +++ b/lib/src/main/java/io/ably/lib/rest/AblyBase.java @@ -14,6 +14,7 @@ import io.ably.lib.platform.Platform; import io.ably.lib.push.Push; import io.ably.lib.realtime.Connection; +import io.ably.lib.transport.Defaults; import io.ably.lib.types.AblyException; import io.ably.lib.types.AsyncHttpPaginatedResponse; import io.ably.lib.types.AsyncPaginatedResult; @@ -249,7 +250,14 @@ public PaginatedResult stats(Param[] params) throws AblyException { } PaginatedResult stats(Http http, Param[] params) throws AblyException { - return new PaginatedQuery<>(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler).get(); + return new PaginatedQuery<>( + http, + "/stats", + // Stats api uses protocol v2 format for now + Param.set(HttpUtils.defaultAcceptHeaders(false), new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2)), + params, + StatsReader.statsResponseHandler + ).get(); } /** @@ -276,8 +284,15 @@ public void statsAsync(Param[] params, Callback> cal statsAsync(http, params, callback); } - void statsAsync(Http http, Param[] params, Callback> callback) { - (new AsyncPaginatedQuery(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler)).get(callback); + void statsAsync(Http http, Param[] params, Callback> callback) { + (new AsyncPaginatedQuery( + http, + "/stats", + // Stats api uses protocol v2 format for now + Param.set(HttpUtils.defaultAcceptHeaders(false), new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2)), + params, + StatsReader.statsResponseHandler + )).get(callback); } /** @@ -433,7 +448,12 @@ private Http.Request publishBatchImpl(final Message.Batch[] p public void execute(HttpScheduler http, final Callback callback) throws AblyException { HttpCore.RequestBody requestBody = options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(pubSpecs) : MessageSerializer.asJSONRequest(pubSpecs); final Param[] params = options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams ; // RSC7c - http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), params, requestBody, new HttpCore.ResponseHandler() { + // This method uses an old batch format from protocol v2 + Param[] headers = Param.set( + HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), + new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2) + ); + http.post("/messages", headers, params, requestBody, new HttpCore.ResponseHandler() { @Override public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException { if(error != null && error.code != 40020) { @@ -446,11 +466,6 @@ public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo er }); } - /** - * Authentication token has changed. waitForResult is true if there is a need to - * wait for server response to auth request - */ - /** * Override this method in AblyRealtime and pass updated token to ConnectionManager * @param token new token diff --git a/lib/src/main/java/io/ably/lib/transport/Defaults.java b/lib/src/main/java/io/ably/lib/transport/Defaults.java index 1c1b6c0a6..d55bce53e 100644 --- a/lib/src/main/java/io/ably/lib/transport/Defaults.java +++ b/lib/src/main/java/io/ably/lib/transport/Defaults.java @@ -12,7 +12,7 @@ public class Defaults { * spec: G4 *

*/ - public static final String ABLY_PROTOCOL_VERSION = "2"; + public static final String ABLY_PROTOCOL_VERSION = "4"; public static final String ABLY_AGENT_VERSION = String.format("%s/%s", "ably-java", BuildConfig.VERSION); diff --git a/lib/src/main/java/io/ably/lib/types/Message.java b/lib/src/main/java/io/ably/lib/types/Message.java index 06e9c17c1..c73270b04 100644 --- a/lib/src/main/java/io/ably/lib/types/Message.java +++ b/lib/src/main/java/io/ably/lib/types/Message.java @@ -3,8 +3,6 @@ import java.io.IOException; import java.lang.reflect.Type; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import com.google.gson.JsonArray; import com.google.gson.JsonDeserializer; @@ -50,17 +48,14 @@ public class Message extends BaseMessage { public String connectionKey; /** - * (TM2k) serial string – an opaque string that uniquely identifies the message. If a message received from Ably - * (whether over realtime or REST, eg history) with an action of MESSAGE_CREATE does not contain a serial, - * the SDK must set it equal to its version. + * (TM2r) serial string – an opaque string that uniquely identifies the message. */ public String serial; /** - * (TM2p) version string – an opaque string that uniquely identifies the message, and is different for different versions. - * (May not be populated depending on app & channel namespace settings) + * (TM2s) The latest version of the message, containing version-specific metadata. */ - public String version; + public MessageVersion version; /** * (TM2j) action enum @@ -68,113 +63,11 @@ public class Message extends BaseMessage { public MessageAction action; /** - * (TM2o) createdAt time in milliseconds since epoch. If a message received from Ably - * (whether over realtime or REST, eg history) with an action of MESSAGE_CREATE does not contain a createdAt, - * the SDK must set it equal to the TM2f timestamp. + * (TM2q) Annotations associated with this message. */ - public Long createdAt; + public MessageAnnotations annotations; - /** - * (TM2l) ref string – an opaque string that uniquely identifies some referenced message. - */ - public String refSerial; - - /** - * (TM2m) refType string – an opaque string that identifies the type of this reference. - */ - public String refType; - - /** - * (TM2n) operation object – data object that may contain the `optional` attributes. - */ - public Operation operation; - - /** - * (TM2q) A summary of all the annotations that have been made to the message, whose keys are the `type` fields - * from any annotations that it includes. Will always be populated for a message with action {@code MESSAGE_SUMMARY}, - * and may be populated for any other type (in particular a message retrieved from - * REST history will have its latest summary included). - */ - public Summary summary; - public static class Operation { - public String clientId; - public String description; - public Map metadata; - - void write(MessagePacker packer) throws IOException { - int fieldCount = 0; - if (clientId != null) fieldCount++; - if (description != null) fieldCount++; - if (metadata != null) fieldCount++; - - packer.packMapHeader(fieldCount); - - if (clientId != null) { - packer.packString("clientId"); - packer.packString(clientId); - } - if (description != null) { - packer.packString("description"); - packer.packString(description); - } - if (metadata != null) { - packer.packString("metadata"); - packer.packMapHeader(metadata.size()); - for (Map.Entry entry : metadata.entrySet()) { - packer.packString(entry.getKey()); - packer.packString(entry.getValue()); - } - } - } - - protected static Operation read(final MessageUnpacker unpacker) throws IOException { - Operation operation = new Operation(); - int fieldCount = unpacker.unpackMapHeader(); - for (int i = 0; i < fieldCount; i++) { - String fieldName = unpacker.unpackString().intern(); - switch (fieldName) { - case "clientId": - operation.clientId = unpacker.unpackString(); - break; - case "description": - operation.description = unpacker.unpackString(); - break; - case "metadata": - int mapSize = unpacker.unpackMapHeader(); - operation.metadata = new HashMap<>(mapSize); - for (int j = 0; j < mapSize; j++) { - String key = unpacker.unpackString(); - String value = unpacker.unpackString(); - operation.metadata.put(key, value); - } - break; - default: - unpacker.skipValue(); - break; - } - } - return operation; - } - - protected static Operation read(final JsonObject jsonObject) throws MessageDecodeException { - Operation operation = new Operation(); - if (jsonObject.has("clientId")) { - operation.clientId = jsonObject.get("clientId").getAsString(); - } - if (jsonObject.has("description")) { - operation.description = jsonObject.get("description").getAsString(); - } - if (jsonObject.has("metadata")) { - JsonObject metadataObject = jsonObject.getAsJsonObject("metadata"); - operation.metadata = new HashMap<>(); - for (Map.Entry entry : metadataObject.entrySet()) { - operation.metadata.put(entry.getKey(), entry.getValue().getAsString()); - } - } - return operation; - } - } private static final String NAME = "name"; private static final String EXTRAS = "extras"; @@ -182,11 +75,7 @@ protected static Operation read(final JsonObject jsonObject) throws MessageDecod private static final String SERIAL = "serial"; private static final String VERSION = "version"; private static final String ACTION = "action"; - private static final String CREATED_AT = "createdAt"; - private static final String REF_SERIAL = "refSerial"; - private static final String REF_TYPE = "refType"; - private static final String OPERATION = "operation"; - private static final String SUMMARY = "summary"; + private static final String ANNOTATIONS = "annotations"; /** * Default constructor @@ -270,11 +159,7 @@ void writeMsgpack(MessagePacker packer) throws IOException { if(serial != null) ++fieldCount; if(version != null) ++fieldCount; if(action != null) ++fieldCount; - if(createdAt != null) ++fieldCount; - if(refSerial != null) ++fieldCount; - if(refType != null) ++fieldCount; - if(operation != null) ++fieldCount; - if(summary != null) ++fieldCount; + if(annotations != null) ++fieldCount; packer.packMapHeader(fieldCount); super.writeFields(packer); @@ -296,31 +181,15 @@ void writeMsgpack(MessagePacker packer) throws IOException { } if(version != null) { packer.packString(VERSION); - packer.packString(version); + version.writeMsgpack(packer); } if(action != null) { packer.packString(ACTION); packer.packInt(action.ordinal()); } - if(createdAt != null) { - packer.packString(CREATED_AT); - packer.packLong(createdAt); - } - if(refSerial != null) { - packer.packString(REF_SERIAL); - packer.packString(refSerial); - } - if(refType != null) { - packer.packString(REF_TYPE); - packer.packString(refType); - } - if(operation != null) { - packer.packString(OPERATION); - operation.write(packer); - } - if(summary != null) { - packer.packString(SUMMARY); - summary.write(packer); + if(annotations != null) { + packer.packString(ANNOTATIONS); + annotations.writeMsgpack(packer); } } @@ -346,19 +215,11 @@ Message readMsgpack(MessageUnpacker unpacker) throws IOException { } else if (fieldName.equals(SERIAL)) { serial = unpacker.unpackString(); } else if (fieldName.equals(VERSION)) { - version = unpacker.unpackString(); + version = MessageVersion.fromMsgpack(unpacker); } else if (fieldName.equals(ACTION)) { action = MessageAction.tryFindByOrdinal(unpacker.unpackInt()); - } else if (fieldName.equals(CREATED_AT)) { - createdAt = unpacker.unpackLong(); - } else if (fieldName.equals(REF_SERIAL)) { - refSerial = unpacker.unpackString(); - } else if (fieldName.equals(REF_TYPE)) { - refType = unpacker.unpackString(); - } else if (fieldName.equals(OPERATION)) { - operation = Operation.read(unpacker); - } else if (fieldName.equals(SUMMARY)) { - summary = Summary.read(unpacker); + } else if (fieldName.equals(ANNOTATIONS)) { + annotations = MessageAnnotations.fromMsgpack(unpacker); } else { Log.v(TAG, "Unexpected field: " + fieldName); @@ -519,27 +380,17 @@ protected void read(final JsonObject map) throws MessageDecodeException { connectionKey = readString(map, CONNECTION_KEY); serial = readString(map, SERIAL); - version = readString(map, VERSION); + + final JsonElement versionElement = map.get(VERSION); + if (versionElement != null) { + version = MessageVersion.read(versionElement); + } Integer actionOrdinal = readInt(map, ACTION); action = actionOrdinal == null ? null : MessageAction.tryFindByOrdinal(actionOrdinal); - createdAt = readLong(map, CREATED_AT); - refSerial = readString(map, REF_SERIAL); - refType = readString(map, REF_TYPE); - - final JsonElement operationElement = map.get(OPERATION); - if (null != operationElement) { - if (!operationElement.isJsonObject()) { - throw MessageDecodeException.fromDescription("Message operation is of type \"" + operationElement.getClass() + "\" when expected a JSON object."); - } - operation = Operation.read(operationElement.getAsJsonObject()); - } - final JsonElement summaryElement = map.get(SUMMARY); - if (summaryElement != null) { - if (!summaryElement.isJsonObject()) { - throw MessageDecodeException.fromDescription("Message summary is of type \"" + summaryElement.getClass() + "\" when expected a JSON object."); - } - summary = Summary.read(summaryElement.getAsJsonObject()); + final JsonElement annotationsElement = map.get(ANNOTATIONS); + if (annotationsElement != null) { + annotations = MessageAnnotations.read(annotationsElement); } } @@ -560,25 +411,13 @@ public JsonElement serialize(Message message, Type typeOfMessage, JsonSerializat json.addProperty(SERIAL, message.serial); } if (message.version != null) { - json.addProperty(VERSION, message.version); + json.add(VERSION, message.version.toJsonTree()); } if (message.action != null) { json.addProperty(ACTION, message.action.ordinal()); } - if (message.createdAt != null) { - json.addProperty(CREATED_AT, message.createdAt); - } - if (message.refSerial != null) { - json.addProperty(REF_SERIAL, message.refSerial); - } - if (message.refType != null) { - json.addProperty(REF_TYPE, message.refType); - } - if (message.operation != null) { - json.add(OPERATION, Serialisation.gson.toJsonTree(message.operation)); - } - if (message.summary != null) { - json.add(SUMMARY, message.summary.toJsonTree()); + if (message.annotations != null) { + json.add(ANNOTATIONS, message.annotations.toJsonTree()); } return json; } diff --git a/lib/src/main/java/io/ably/lib/types/MessageAnnotations.java b/lib/src/main/java/io/ably/lib/types/MessageAnnotations.java new file mode 100644 index 000000000..ae4dddc23 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/types/MessageAnnotations.java @@ -0,0 +1,134 @@ +package io.ably.lib.types; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import io.ably.lib.util.Log; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessagePacker; +import org.msgpack.core.MessageUnpacker; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.HashMap; + +/** + * Contains information about annotations associated with a particular message. + */ +public class MessageAnnotations { + + private static final String TAG = MessageAnnotations.class.getName(); + + private static final String SUMMARY = "summary"; + + /** + * A summary of all the annotations that have been made to the message. Will always be + * populated for a message.annotations.summary, and may be populated for any other type (in + * particular a message retrieved from REST history will have its latest summary + * included). + * The keys of the map are the annotation types. The exact structure of the value of + * each key depends on the aggregation part of the annotation type, e.g. for a type of + * reaction:distinct.v1, the value will be a DistinctValues object. New aggregation + * methods might be added serverside, hence the 'unknown' part of the sum type. + */ + public Summary summary; + + public MessageAnnotations() { + this.summary = new Summary(new HashMap<>()); + } + + public MessageAnnotations(Summary summary) { + this.summary = summary != null ? summary : new Summary(new HashMap<>()); + } + + void writeMsgpack(MessagePacker packer) throws IOException { + int fieldCount = 0; + if (summary != null) ++fieldCount; + + packer.packMapHeader(fieldCount); + + if (summary != null) { + packer.packString(SUMMARY); + summary.write(packer); + } + } + + MessageAnnotations readMsgpack(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + for (int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString().intern(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if (fieldFormat.equals(MessageFormat.NIL)) { + unpacker.unpackNil(); + continue; + } + + if (fieldName.equals(SUMMARY)) { + summary = Summary.read(unpacker); + } else { + Log.v(TAG, "Unexpected field: " + fieldName); + unpacker.skipValue(); + } + } + + return this; + } + + static MessageAnnotations fromMsgpack(MessageUnpacker unpacker) throws IOException { + return (new MessageAnnotations()).readMsgpack(unpacker); + } + + protected void read(final JsonObject map) throws MessageDecodeException { + final JsonElement summaryElement = map.get(SUMMARY); + if (summaryElement != null) { + if (!summaryElement.isJsonObject()) { + throw MessageDecodeException.fromDescription("MessageAnnotations summary is of type \"" + summaryElement.getClass() + "\" when expected a JSON object."); + } + summary = Summary.read(summaryElement.getAsJsonObject()); + } + } + + static MessageAnnotations read(JsonElement json) throws MessageDecodeException { + if (!json.isJsonObject()) { + Log.w(TAG, "Message annotations is of type \"" + json.getClass() + "\" when expected a JSON object."); + } + + MessageAnnotations annotations = new MessageAnnotations(); + annotations.read(json.getAsJsonObject()); + return annotations; + } + + JsonElement toJsonTree() { + JsonObject json = new JsonObject(); + if (summary != null) { + json.add(SUMMARY, summary.toJsonTree()); + } + return json; + } + + public static class Serializer implements JsonSerializer, JsonDeserializer { + @Override + public JsonElement serialize(MessageAnnotations annotations, Type typeOfMessage, JsonSerializationContext ctx) { + return annotations.toJsonTree(); + } + + @Override + public MessageAnnotations deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { + try { + return read(json); + } catch (MessageDecodeException e) { + Log.e(TAG, e.getMessage(), e); + throw new JsonParseException("Failed to deserialize MessageAnnotations from JSON.", e); + } + } + } + + @Override + public String toString() { + return "{MessageAnnotations summary=" + summary + "}"; + } +} diff --git a/lib/src/main/java/io/ably/lib/types/MessageVersion.java b/lib/src/main/java/io/ably/lib/types/MessageVersion.java new file mode 100644 index 000000000..eaf3ae009 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/types/MessageVersion.java @@ -0,0 +1,235 @@ +package io.ably.lib.types; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import io.ably.lib.util.Log; +import org.jetbrains.annotations.NotNull; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessagePacker; +import org.msgpack.core.MessageUnpacker; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.Map; + +/** + * Contains the details regarding the current version of the message - including when it was updated and by whom. + */ +public class MessageVersion { + + private static final String TAG = MessageVersion.class.getName(); + + private static final String SERIAL = "serial"; + private static final String TIMESTAMP = "timestamp"; + private static final String CLIENT_ID = "clientId"; + private static final String DESCRIPTION = "description"; + private static final String METADATA = "metadata"; + + /** + * A unique identifier for the version of the message, lexicographically-comparable with other versions (that + * share the same `Message.serial`). Will differ from the `Message.serial` only if the message has been + * updated or deleted. + */ + public String serial; + + /** + * The timestamp of the message version. + *

+ * If the `Message.action` is `message.create`, this will equal the `Message.timestamp`. + */ + public long timestamp; + + /** + * The client ID of the client that updated the message to this version. + */ + public String clientId; + + /** + * The description provided by the client that updated the message to this version. + */ + public String description; + + /** + * A map of string key-value pairs that may contain metadata associated with the operation to update + * the message to this version. + */ + public Map metadata; + + public MessageVersion() {} + + public MessageVersion(String serial, Long timestamp) { + this.serial = serial; + this.timestamp = timestamp; + } + + void writeMsgpack(MessagePacker packer) throws IOException { + int fieldCount = 0; + if (serial != null) ++fieldCount; + if (timestamp != 0) ++fieldCount; + if (clientId != null) fieldCount++; + if (description != null) fieldCount++; + if (metadata != null) fieldCount++; + + packer.packMapHeader(fieldCount); + + if (serial != null) { + packer.packString(SERIAL); + packer.packString(serial); + } + + if (timestamp != 0) { + packer.packString(TIMESTAMP); + packer.packLong(timestamp); + } + + if (clientId != null) { + packer.packString(CLIENT_ID); + packer.packString(clientId); + } + + if (description != null) { + packer.packString(DESCRIPTION); + packer.packString(description); + } + + if (metadata != null) { + packer.packString(METADATA); + packer.packMapHeader(metadata.size()); + for (Map.Entry entry : metadata.entrySet()) { + packer.packString(entry.getKey()); + packer.packString(entry.getValue()); + } + } + } + + MessageVersion readMsgpack(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + for (int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString().intern(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if (fieldFormat.equals(MessageFormat.NIL)) { + unpacker.unpackNil(); + continue; + } + + switch (fieldName) { + case SERIAL: + serial = unpacker.unpackString(); + break; + case TIMESTAMP: + timestamp = unpacker.unpackLong(); + break; + case CLIENT_ID: + clientId = unpacker.unpackString(); + break; + case DESCRIPTION: + description = unpacker.unpackString(); + break; + case METADATA: + int mapSize = unpacker.unpackMapHeader(); + metadata = new HashMap<>(mapSize); + for (int j = 0; j < mapSize; j++) { + String key = unpacker.unpackString(); + String value = unpacker.unpackString(); + metadata.put(key, value); + } + break; + default: + Log.v(TAG, "Unexpected field: " + fieldName); + unpacker.skipValue(); + break; + } + } + return this; + } + + static MessageVersion fromMsgpack(MessageUnpacker unpacker) throws IOException { + return (new MessageVersion()).readMsgpack(unpacker); + } + + protected void read(final JsonObject map) throws MessageDecodeException { + serial = readString(map, SERIAL); + timestamp = readLong(map, TIMESTAMP);; + clientId = readString(map, CLIENT_ID);; + description = readString(map, DESCRIPTION);; + if (map.has(METADATA)) { + JsonObject metadataObject = map.getAsJsonObject(METADATA); + metadata = new HashMap<>(); + for (Map.Entry entry : metadataObject.entrySet()) { + metadata.put(entry.getKey(), entry.getValue().getAsString()); + } + } + } + + static MessageVersion read(JsonElement json) throws MessageDecodeException { + if (!json.isJsonObject()) { + throw MessageDecodeException.fromDescription("Expected an object but got \"" + json.getClass() + "\"."); + } + + MessageVersion version = new MessageVersion(); + version.read(json.getAsJsonObject()); + return version; + } + + private String readString(JsonObject map, String key) { + JsonElement element = map.get(key); + return (element != null && !element.isJsonNull()) ? element.getAsString() : null; + } + + private long readLong(JsonObject map, String key) { + JsonElement element = map.get(key); + return (element != null && !element.isJsonNull()) ? element.getAsLong() : 0; + } + + JsonElement toJsonTree() { + JsonObject json = new JsonObject(); + if (serial != null) { + json.addProperty(SERIAL, serial); + } + if (timestamp != 0) { + json.addProperty(TIMESTAMP, timestamp); + } + if (clientId != null) { + json.addProperty(CLIENT_ID, clientId); + } + if (description != null) { + json.addProperty(DESCRIPTION, description); + } + if (metadata != null) { + JsonObject metadataObject = new JsonObject(); + for (Map.Entry entry : metadata.entrySet()) { + metadataObject.addProperty(entry.getKey(), entry.getValue()); + } + json.add(METADATA, metadataObject); + } + return json; + } + + public static class Serializer implements JsonSerializer, JsonDeserializer { + @Override + public JsonElement serialize(MessageVersion version, Type typeOfMessage, JsonSerializationContext ctx) { + return version.toJsonTree(); + } + + @Override + public MessageVersion deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { + try { + return read(json); + } catch (MessageDecodeException e) { + Log.e(TAG, e.getMessage(), e); + throw new JsonParseException("Failed to deserialize MessageVersion from JSON.", e); + } + } + } + + @Override + public @NotNull String toString() { + return "{MessageVersion serial=" + serial + ", timestamp=" + timestamp + "}"; + } +} diff --git a/lib/src/main/java/io/ably/lib/types/Summary.java b/lib/src/main/java/io/ably/lib/types/Summary.java index 292fe67b6..1411c0f81 100644 --- a/lib/src/main/java/io/ably/lib/types/Summary.java +++ b/lib/src/main/java/io/ably/lib/types/Summary.java @@ -19,7 +19,7 @@ /** * A summary of all the annotations that have been made to the message. Will always be - * populated for a message.summary, and may be populated for any other type (in + * populated for a message.annotations.summary, and may be populated for any other type (in * particular a message retrieved from REST history will have its latest summary * included). * The keys of the map are the annotation types. The exact structure of the value of diff --git a/lib/src/main/java/io/ably/lib/util/Serialisation.java b/lib/src/main/java/io/ably/lib/util/Serialisation.java index 8397cd069..caf1237bd 100644 --- a/lib/src/main/java/io/ably/lib/util/Serialisation.java +++ b/lib/src/main/java/io/ably/lib/util/Serialisation.java @@ -15,7 +15,9 @@ import io.ably.lib.types.AnnotationAction; import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.Message; +import io.ably.lib.types.MessageAnnotations; import io.ably.lib.types.MessageExtras; +import io.ably.lib.types.MessageVersion; import io.ably.lib.types.PresenceMessage; import io.ably.lib.types.ProtocolMessage; import io.ably.lib.types.Summary; @@ -51,8 +53,10 @@ public class Serialisation { gsonBuilder.registerTypeAdapter(PresenceMessage.class, new PresenceMessage.Serializer()); gsonBuilder.registerTypeAdapter(PresenceMessage.Action.class, new PresenceMessage.ActionSerializer()); gsonBuilder.registerTypeAdapter(ProtocolMessage.Action.class, new ProtocolMessage.ActionSerializer()); + gsonBuilder.registerTypeAdapter(MessageVersion.class, new MessageVersion.Serializer()); gsonBuilder.registerTypeAdapter(Annotation.class, new Annotation.Serializer()); gsonBuilder.registerTypeAdapter(AnnotationAction.class, new Annotation.ActionSerializer()); + gsonBuilder.registerTypeAdapter(MessageAnnotations.class, new MessageAnnotations.Serializer()); gsonBuilder.registerTypeAdapter(Summary.class, new Summary.Serializer()); gson = gsonBuilder.create(); diff --git a/lib/src/test/java/io/ably/lib/chat/ChatMessagesTest.java b/lib/src/test/java/io/ably/lib/chat/ChatMessagesTest.java index 940071a75..0c9636439 100644 --- a/lib/src/test/java/io/ably/lib/chat/ChatMessagesTest.java +++ b/lib/src/test/java/io/ably/lib/chat/ChatMessagesTest.java @@ -9,6 +9,7 @@ import io.ably.lib.types.ClientOptions; import io.ably.lib.types.Message; import io.ably.lib.types.MessageAction; +import io.ably.lib.types.MessageVersion; import org.junit.Assert; import org.junit.Test; @@ -93,12 +94,11 @@ public void test_room_message_is_published() { Assert.assertEquals(resultCreatedAt, String.valueOf(message.timestamp)); - Assert.assertEquals(resultCreatedAt, message.createdAt.toString()); Assert.assertEquals(resultSerial, message.serial); - Assert.assertEquals(resultSerial, message.version); + Assert.assertEquals(resultSerial, message.version.serial); Assert.assertEquals(MessageAction.MESSAGE_CREATE, message.action); - Assert.assertEquals(resultCreatedAt, message.createdAt.toString()); + Assert.assertEquals(resultCreatedAt, String.valueOf(message.version.timestamp)); } catch (Exception e) { e.printStackTrace(); @@ -200,13 +200,13 @@ public void test_room_message_is_updated() { Assert.assertEquals(1, metadata.getAsJsonObject("foo").get("bar").getAsInt()); Assert.assertEquals(originalSerial, updatedMessage.serial); - Assert.assertEquals(originalCreatedAt, updatedMessage.createdAt.toString()); + Assert.assertEquals(originalCreatedAt, String.valueOf(updatedMessage.timestamp)); - Assert.assertEquals(updateResultVersion, updatedMessage.version); - Assert.assertEquals(updateResultTimestamp, String.valueOf(updatedMessage.timestamp)); + Assert.assertEquals(updateResultVersion, updatedMessage.version.serial); + Assert.assertEquals(updateResultTimestamp, String.valueOf(updatedMessage.version.timestamp)); // updatedMessage contains `operation` with fields as clientId, description, metadata, assert for these fields - Message.Operation operation = updatedMessage.operation; + MessageVersion operation = updatedMessage.version; Assert.assertEquals("clientId1", operation.clientId); Assert.assertEquals("message updated by clientId1", operation.description); Assert.assertEquals(2, operation.metadata.size()); @@ -294,19 +294,19 @@ public void test_room_message_is_deleted() { Assert.assertEquals("clientId1", deletedMessage.clientId); Assert.assertEquals(originalSerial, deletedMessage.serial); - Assert.assertEquals(originalCreatedAt, deletedMessage.createdAt.toString()); + Assert.assertEquals(originalCreatedAt, String.valueOf(deletedMessage.timestamp)); - Assert.assertEquals(deleteResultVersion, deletedMessage.version); - Assert.assertEquals(deleteResultTimestamp, String.valueOf(deletedMessage.timestamp)); + Assert.assertEquals(deleteResultVersion, deletedMessage.version.serial); + Assert.assertEquals(deleteResultTimestamp, String.valueOf(deletedMessage.version.timestamp)); // deletedMessage contains `operation` with fields as clientId, reason - Message.Operation operation = deletedMessage.operation; - Assert.assertEquals("clientId1", operation.clientId); - Assert.assertEquals("message deleted by clientId1", operation.description); + MessageVersion version = deletedMessage.version; + Assert.assertEquals("clientId1", version.clientId); + Assert.assertEquals("message deleted by clientId1", version.description); // assert on metadata - Assert.assertEquals(2, operation.metadata.size()); - Assert.assertEquals("bar", operation.metadata.get("foo")); - Assert.assertEquals("hero", operation.metadata.get("naruto")); + Assert.assertEquals(2, version.metadata.size()); + Assert.assertEquals("bar", version.metadata.get("foo")); + Assert.assertEquals("hero", version.metadata.get("naruto")); } catch (Exception e) { e.printStackTrace(); @@ -405,8 +405,8 @@ public void test_room_message_create_update_delete() { JsonObject updatedData = (JsonObject) updatedMessage.data; Assert.assertEquals("updated text", updatedData.get("text").getAsString()); - Assert.assertEquals(updateResultVersion, updatedMessage.version); - Assert.assertEquals(updateResultTimestamp, String.valueOf(updatedMessage.timestamp)); + Assert.assertEquals(updateResultVersion, updatedMessage.version.serial); + Assert.assertEquals(updateResultTimestamp, String.valueOf(updatedMessage.version.timestamp)); // Verify the deleted message Message deletedMessage = receivedMsg.get(2); @@ -415,18 +415,18 @@ public void test_room_message_create_update_delete() { Assert.assertEquals("chat.message", deletedMessage.name); Assert.assertEquals("clientId1", deletedMessage.clientId); - Assert.assertEquals(deleteResultVersion, deletedMessage.version); - Assert.assertEquals(deleteResultTimestamp, String.valueOf(deletedMessage.timestamp)); + Assert.assertEquals(deleteResultVersion, deletedMessage.version.serial); + Assert.assertEquals(deleteResultTimestamp, String.valueOf(deletedMessage.version.timestamp)); // Check original serials Assert.assertEquals(originalSerial, createdMessage.serial); Assert.assertEquals(originalSerial, updatedMessage.serial); Assert.assertEquals(originalSerial, deletedMessage.serial); - // Check original message createdAt - Assert.assertEquals(originalCreatedAt, createdMessage.createdAt.toString()); - Assert.assertEquals(originalCreatedAt, updatedMessage.createdAt.toString()); - Assert.assertEquals(originalCreatedAt, deletedMessage.createdAt.toString()); + // Check original message timestamp + Assert.assertEquals(originalCreatedAt, String.valueOf(createdMessage.timestamp)); + Assert.assertEquals(originalCreatedAt, String.valueOf(updatedMessage.timestamp)); + Assert.assertEquals(originalCreatedAt, String.valueOf(deletedMessage.timestamp)); } catch (Exception e) { e.printStackTrace(); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java index 1e5f2f064..874f8b239 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java @@ -81,7 +81,7 @@ public void realtime_websocket_param_test() { * Defaults.ABLY_VERSION_PARAM, as ultimately the request param has been derived from those values. */ assertEquals("Verify correct version", requestParameters.get("v"), - Collections.singletonList("2")); + Collections.singletonList("4")); /* Spec RSC7d3 * This test should not directly validate version against Defaults.ABLY_AGENT_VERSION, nor diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java index dff2b1711..550148e3b 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java @@ -13,6 +13,7 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import com.google.gson.Gson; import com.google.gson.JsonArray; @@ -22,8 +23,10 @@ import io.ably.lib.test.util.AblyCommonsReader; import io.ably.lib.types.ChannelOptions; import io.ably.lib.types.MessageAction; +import io.ably.lib.types.MessageAnnotations; import io.ably.lib.types.MessageExtras; import io.ably.lib.types.Param; +import io.ably.lib.types.Summary; import io.ably.lib.util.Serialisation; import org.junit.Ignore; import org.junit.Rule; @@ -961,22 +964,18 @@ public void opaque_message_extras() throws AblyException { } /** - * Check that important chat SDK fields are populated (serial, action, createdAt) + * Check that important chat SDK fields are populated (serial, action, version) */ @Test - public void should_have_serial_action_createdAt() throws AblyException { + public void should_have_serial_action_version() throws AblyException { ClientOptions opts = createOptions(testVars.keys[7].keyStr); + AtomicReference receivedMessage = new AtomicReference<>(); opts.clientId = "chat"; try (AblyRealtime realtime = new AblyRealtime(opts)) { final Channel channel = realtime.channels.get("foo::$chat::$chatMessages"); CompletionWaiter msgComplete = new CompletionWaiter(); channel.subscribe(message -> { - assertNotNull(message.serial); - assertNotNull(message.version); - assertNotNull(message.createdAt); - assertEquals(MessageAction.MESSAGE_CREATE, message.action); - assertEquals("chat.message", message.name); - assertEquals("hello world!", ((JsonObject)message.data).get("text").getAsString()); + receivedMessage.set(message); msgComplete.onSuccess(); }); @@ -997,6 +996,13 @@ public void should_have_serial_action_createdAt() throws AblyException { // wait until we get message on the channel assertNull(msgComplete.waitFor(1, 10_000)); + Message message = receivedMessage.get(); + assertNotNull(message.serial); + assertNotNull(message.version); + assertEquals(message.timestamp, message.version.timestamp); + assertEquals(MessageAction.MESSAGE_CREATE, message.action); + assertEquals("chat.message", message.name); + assertEquals("hello world!", ((JsonObject)message.data).get("text").getAsString()); } } @@ -1039,4 +1045,48 @@ public void should_not_duplicate_messages() throws Exception { } } + /** + * Check that important chat SDK fields are populated (serial, action, version) + */ + @Test + public void should_have_annotations_and_versions() throws Exception { + ClientOptions opts = createOptions(testVars.keys[7].keyStr); + AtomicReference receivedMessage = new AtomicReference<>(); + try (AblyRealtime realtime = new AblyRealtime(opts)) { + final Channel channel = realtime.channels.get("should_have_annotations_and_versions"); + CompletionWaiter msgComplete = new CompletionWaiter(); + channel.subscribe(message -> { + receivedMessage.set(message); + msgComplete.onSuccess(); + }); + + CompletionWaiter attachListener = new CompletionWaiter(); + channel.attach(attachListener); + assertNull(attachListener.waitFor(1, 10_000)); + + /* publish to the channel */ + try (AblyRest rest = new AblyRest(opts)) { + Message[] messages = new Message[] { + new Message("message", "hello world!"), + }; + + rest.channels.get("should_have_annotations_and_versions").publish(messages); + } + + // wait until we get message on the channel + assertNull(msgComplete.waitFor(1, 10_000)); + Message message = receivedMessage.get(); + assertNotNull(message.version); + assertEquals(message.timestamp, message.version.timestamp); + assertNotNull(message.annotations); + assertNotNull(message.annotations.summary); + assertEquals( + Serialisation.gson.toJsonTree(message.annotations), + Serialisation.gson.toJsonTree(new MessageAnnotations(new Summary(new HashMap<>()))) + ); + assertEquals("message", message.name); + assertEquals("hello world!", message.data); + } + } + } diff --git a/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java b/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java index 55e47003b..a07b50d9c 100644 --- a/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java +++ b/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java @@ -81,7 +81,7 @@ public void header_lib_channel_publish() { * from those values. */ Assert.assertNotNull("Expected headers", headers); - Assert.assertEquals(headers.get("x-ably-version"), "2"); + Assert.assertEquals(headers.get("x-ably-version"), "4"); Assert.assertEquals(headers.get("ably-agent"), expectedAblyAgentHeader); // RSA7e2 Assert.assertNull("Shouldn't include 'x-ably-clientid' if `clientId` is not specified", headers.get("x-ably-clientid")); diff --git a/lib/src/test/java/io/ably/lib/test/rest/RestRequestTest.java b/lib/src/test/java/io/ably/lib/test/rest/RestRequestTest.java index 4121333ef..a1791a9bc 100644 --- a/lib/src/test/java/io/ably/lib/test/rest/RestRequestTest.java +++ b/lib/src/test/java/io/ably/lib/test/rest/RestRequestTest.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.concurrent.TimeoutException; +import io.ably.lib.transport.Defaults; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -196,7 +197,9 @@ public void request_paginated() { AblyRest ably = new AblyRest(opts); Param[] params = new Param[] { new Param("prefix", channelNamePrefix) }; - HttpPaginatedResponse channelsResponse = ably.request(HttpConstants.Methods.GET, channelsPath, params, null, null); + Param[] requestHeaders = new Param[] { new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2) }; + HttpPaginatedResponse channelsResponse = + ably.request(HttpConstants.Methods.GET, channelsPath, params, null, requestHeaders); /* check HttpPagninatedResponse details are present */ assertEquals("Verify statusCode is present", channelsResponse.statusCode, 200); @@ -239,7 +242,8 @@ public void request_paginated_async() { AblyRest ably = new AblyRest(opts); Param[] params = new Param[] { new Param("prefix", channelNamePrefix) }; - ably.requestAsync(HttpConstants.Methods.GET, channelsPath, params, null, null, new AsyncHttpPaginatedResponse.Callback() { + Param[] requestHeaders = new Param[] { new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2) }; + ably.requestAsync(HttpConstants.Methods.GET, channelsPath, params, null, requestHeaders, new AsyncHttpPaginatedResponse.Callback() { @Override public void onResponse(AsyncHttpPaginatedResponse channelResponse) { @@ -308,7 +312,9 @@ public void request_paginated_limit() { AblyRest ably = new AblyRest(opts); Param[] params = new Param[] { new Param("prefix", channelNamePrefix), new Param("limit", "1") }; - HttpPaginatedResponse channelsResponse = ably.request(HttpConstants.Methods.GET, channelsPath, params, null, null); + Param[] requestHeaders = new Param[] { new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2) }; + HttpPaginatedResponse channelsResponse = + ably.request(HttpConstants.Methods.GET, channelsPath, params, null, requestHeaders); /* check HttpPagninatedResponse details are present */ assertEquals("Verify statusCode is present", channelsResponse.statusCode, 200); @@ -370,7 +376,8 @@ public void request_paginated_async_limit() { AblyRest ably = new AblyRest(opts); Param[] params = new Param[] { new Param("prefix", channelNamePrefix), new Param("limit", "1") }; - ably.requestAsync(HttpConstants.Methods.GET, channelsPath, params, null, null, new AsyncHttpPaginatedResponse.Callback() { + Param[] headers = new Param[] { new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2) }; + ably.requestAsync(HttpConstants.Methods.GET, channelsPath, params, null, headers, new AsyncHttpPaginatedResponse.Callback() { @Override public void onResponse(AsyncHttpPaginatedResponse channelsResponse) { @@ -533,7 +540,8 @@ public void request_post_async() { /* publish a message */ Message message = new Message("Test event", messageData); HttpUtils.JsonRequestBody requestBody = new HttpUtils.JsonRequestBody(message); - ably.requestAsync(HttpConstants.Methods.POST, channelMessagesPath, null, requestBody, null, new AsyncHttpPaginatedResponse.Callback() { + Param[] requestHeaders = new Param[] { new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2) }; + ably.requestAsync(HttpConstants.Methods.POST, channelMessagesPath, null, requestBody, requestHeaders, new AsyncHttpPaginatedResponse.Callback() { @Override public void onResponse(AsyncHttpPaginatedResponse publishResponse) { diff --git a/lib/src/test/java/io/ably/lib/transport/DefaultsTest.java b/lib/src/test/java/io/ably/lib/transport/DefaultsTest.java index cdeec5dce..b88d78d81 100644 --- a/lib/src/test/java/io/ably/lib/transport/DefaultsTest.java +++ b/lib/src/test/java/io/ably/lib/transport/DefaultsTest.java @@ -9,7 +9,7 @@ public class DefaultsTest { @Test public void protocol_version_CSV2() { - assertThat(Defaults.ABLY_PROTOCOL_VERSION, is("2")); + assertThat(Defaults.ABLY_PROTOCOL_VERSION, is("4")); } @Test diff --git a/lib/src/test/java/io/ably/lib/types/MessageTest.java b/lib/src/test/java/io/ably/lib/types/MessageTest.java index 18dcf81d7..2bb5dd4a8 100644 --- a/lib/src/test/java/io/ably/lib/types/MessageTest.java +++ b/lib/src/test/java/io/ably/lib/types/MessageTest.java @@ -102,15 +102,13 @@ public void serialize_message_with_operation() { Message message = new Message("test-name", "test-data"); message.clientId = "test-client-id"; message.connectionKey = "test-key"; - message.refSerial = "test-ref-serial"; - message.refType = "test-ref-type"; - Message.Operation operation = new Message.Operation(); - operation.clientId = "operation-client-id"; - operation.description = "operation-description"; - operation.metadata = new HashMap<>(); - operation.metadata.put("key1", "value1"); - operation.metadata.put("key2", "value2"); - message.operation = operation; + MessageVersion version = new MessageVersion(); + version.clientId = "operation-client-id"; + version.description = "operation-description"; + version.metadata = new HashMap<>(); + version.metadata.put("key1", "value1"); + version.metadata.put("key2", "value2"); + message.version = version; // When JsonElement serializedElement = serializer.serialize(message, null, null); @@ -121,12 +119,10 @@ public void serialize_message_with_operation() { assertEquals("test-key", serializedObject.get("connectionKey").getAsString()); assertEquals("test-data", serializedObject.get("data").getAsString()); assertEquals("test-name", serializedObject.get("name").getAsString()); - assertEquals("test-ref-serial", serializedObject.get("refSerial").getAsString()); - assertEquals("test-ref-type", serializedObject.get("refType").getAsString()); - JsonObject operationObject = serializedObject.getAsJsonObject("operation"); - assertEquals("operation-client-id", operationObject.get("clientId").getAsString()); - assertEquals("operation-description", operationObject.get("description").getAsString()); - JsonObject metadataObject = operationObject.getAsJsonObject("metadata"); + JsonObject versionObject = serializedObject.getAsJsonObject("version"); + assertEquals("operation-client-id", versionObject.get("clientId").getAsString()); + assertEquals("operation-description", versionObject.get("description").getAsString()); + JsonObject metadataObject = versionObject.getAsJsonObject("metadata"); assertEquals("value1", metadataObject.get("key1").getAsString()); assertEquals("value2", metadataObject.get("key2").getAsString()); } @@ -138,17 +134,15 @@ public void deserialize_message_with_operation() throws Exception { jsonObject.addProperty("clientId", "test-client-id"); jsonObject.addProperty("data", "test-data"); jsonObject.addProperty("name", "test-name"); - jsonObject.addProperty("refSerial", "test-ref-serial"); - jsonObject.addProperty("refType", "test-ref-type"); jsonObject.addProperty("connectionKey", "test-key"); - JsonObject operationObject = new JsonObject(); - operationObject.addProperty("clientId", "operation-client-id"); - operationObject.addProperty("description", "operation-description"); + JsonObject versionObject = new JsonObject(); + versionObject.addProperty("clientId", "operation-client-id"); + versionObject.addProperty("description", "operation-description"); JsonObject metadataObject = new JsonObject(); metadataObject.addProperty("key1", "value1"); metadataObject.addProperty("key2", "value2"); - operationObject.add("metadata", metadataObject); - jsonObject.add("operation", operationObject); + versionObject.add("metadata", metadataObject); + jsonObject.add("version", versionObject); // When Message message = Message.fromEncoded(jsonObject, new ChannelOptions()); @@ -157,13 +151,11 @@ public void deserialize_message_with_operation() throws Exception { assertEquals("test-client-id", message.clientId); assertEquals("test-data", message.data); assertEquals("test-name", message.name); - assertEquals("test-ref-serial", message.refSerial); - assertEquals("test-ref-type", message.refType); assertEquals("test-key", message.connectionKey); - assertEquals("operation-client-id", message.operation.clientId); - assertEquals("operation-description", message.operation.description); - assertEquals("value1", message.operation.metadata.get("key1")); - assertEquals("value2", message.operation.metadata.get("key2")); + assertEquals("operation-client-id", message.version.clientId); + assertEquals("operation-description", message.version.description); + assertEquals("value1", message.version.metadata.get("key1")); + assertEquals("value2", message.version.metadata.get("key2")); } @Test @@ -193,17 +185,15 @@ public void serialize_and_deserialize_with_msgpack() throws Exception { Message message = new Message("test-name", "test-data"); message.clientId = "test-client-id"; message.connectionKey = "test-key"; - message.refSerial = "test-ref-serial"; - message.refType = "test-ref-type"; message.action = MessageAction.MESSAGE_CREATE; message.serial = "01826232498871-001@abcdefghij:001"; - Message.Operation operation = new Message.Operation(); - operation.clientId = "operation-client-id"; - operation.description = "operation-description"; - operation.metadata = new HashMap<>(); - operation.metadata.put("key1", "value1"); - operation.metadata.put("key2", "value2"); - message.operation = operation; + MessageVersion version = new MessageVersion(); + version.clientId = "operation-client-id"; + version.description = "operation-description"; + version.metadata = new HashMap<>(); + version.metadata.put("key1", "value1"); + version.metadata.put("key2", "value2"); + message.version = version; // When Encode to MessagePack ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -221,13 +211,11 @@ public void serialize_and_deserialize_with_msgpack() throws Exception { assertEquals("test-key", unpacked.connectionKey); assertEquals("test-data", unpacked.data); assertEquals("test-name", unpacked.name); - assertEquals("test-ref-serial", unpacked.refSerial); - assertEquals("test-ref-type", unpacked.refType); assertEquals(MessageAction.MESSAGE_CREATE, unpacked.action); assertEquals("01826232498871-001@abcdefghij:001", unpacked.serial); - assertEquals("operation-client-id", unpacked.operation.clientId); - assertEquals("operation-description", unpacked.operation.description); - assertEquals("value1", unpacked.operation.metadata.get("key1")); - assertEquals("value2", unpacked.operation.metadata.get("key2")); + assertEquals("operation-client-id", unpacked.version.clientId); + assertEquals("operation-description", unpacked.version.description); + assertEquals("value1", unpacked.version.metadata.get("key1")); + assertEquals("value2", unpacked.version.metadata.get("key2")); } } From d05b4dd758bad3d0360ce50301f1299bd44654af Mon Sep 17 00:00:00 2001 From: evgeny Date: Wed, 24 Sep 2025 17:44:36 +0100 Subject: [PATCH 2/2] refactor: simplify `MessageVersion` and `MessageAnnotations` parsing, improve readability Make parsing methods static and refactor to use local variables, reducing reliance on instance-level mutation. Introduce `LEGACY_API_PROTOCOL_V2` constant for consistent versioning in legacy endpoints calls. --- .../main/java/io/ably/lib/rest/AblyBase.java | 22 ++++++-- .../main/java/io/ably/lib/types/Message.java | 4 +- .../io/ably/lib/types/MessageAnnotations.java | 30 +++++------ .../io/ably/lib/types/MessageVersion.java | 53 +++++++++---------- 4 files changed, 59 insertions(+), 50 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/rest/AblyBase.java b/lib/src/main/java/io/ably/lib/rest/AblyBase.java index 8d6560ca6..54bbb539e 100644 --- a/lib/src/main/java/io/ably/lib/rest/AblyBase.java +++ b/lib/src/main/java/io/ably/lib/rest/AblyBase.java @@ -45,6 +45,16 @@ */ public abstract class AblyBase implements AutoCloseable { + /** + * Some REST endpoints (e.g., stats and batch) changed in protocol v3. + * To preserve backward compatibility for those specific endpoints, we + * explicitly request protocol v2 when calling them. + *

+ * Use this only for legacy endpoints that must remain on v2; all other + * calls should use the default protocol version. + */ + private static final int LEGACY_API_PROTOCOL_V2 = 2; + public final ClientOptions options; public final Http http; public final HttpCore httpCore; @@ -254,7 +264,10 @@ PaginatedResult stats(Http http, Param[] params) throws AblyException { http, "/stats", // Stats api uses protocol v2 format for now - Param.set(HttpUtils.defaultAcceptHeaders(false), new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2)), + Param.set( + HttpUtils.defaultAcceptHeaders(false), + new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, LEGACY_API_PROTOCOL_V2) + ), params, StatsReader.statsResponseHandler ).get(); @@ -289,7 +302,10 @@ void statsAsync(Http http, Param[] params, Callback> http, "/stats", // Stats api uses protocol v2 format for now - Param.set(HttpUtils.defaultAcceptHeaders(false), new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2)), + Param.set( + HttpUtils.defaultAcceptHeaders(false), + new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, LEGACY_API_PROTOCOL_V2) + ), params, StatsReader.statsResponseHandler )).get(callback); @@ -451,7 +467,7 @@ public void execute(HttpScheduler http, final Callback callba // This method uses an old batch format from protocol v2 Param[] headers = Param.set( HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), - new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2) + new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, LEGACY_API_PROTOCOL_V2) ); http.post("/messages", headers, params, requestBody, new HttpCore.ResponseHandler() { @Override diff --git a/lib/src/main/java/io/ably/lib/types/Message.java b/lib/src/main/java/io/ably/lib/types/Message.java index c73270b04..a232a6e55 100644 --- a/lib/src/main/java/io/ably/lib/types/Message.java +++ b/lib/src/main/java/io/ably/lib/types/Message.java @@ -215,11 +215,11 @@ Message readMsgpack(MessageUnpacker unpacker) throws IOException { } else if (fieldName.equals(SERIAL)) { serial = unpacker.unpackString(); } else if (fieldName.equals(VERSION)) { - version = MessageVersion.fromMsgpack(unpacker); + version = MessageVersion.read(unpacker); } else if (fieldName.equals(ACTION)) { action = MessageAction.tryFindByOrdinal(unpacker.unpackInt()); } else if (fieldName.equals(ANNOTATIONS)) { - annotations = MessageAnnotations.fromMsgpack(unpacker); + annotations = MessageAnnotations.read(unpacker); } else { Log.v(TAG, "Unexpected field: " + fieldName); diff --git a/lib/src/main/java/io/ably/lib/types/MessageAnnotations.java b/lib/src/main/java/io/ably/lib/types/MessageAnnotations.java index ae4dddc23..a8d3b9bfb 100644 --- a/lib/src/main/java/io/ably/lib/types/MessageAnnotations.java +++ b/lib/src/main/java/io/ably/lib/types/MessageAnnotations.java @@ -57,7 +57,9 @@ void writeMsgpack(MessagePacker packer) throws IOException { } } - MessageAnnotations readMsgpack(MessageUnpacker unpacker) throws IOException { + static MessageAnnotations read(MessageUnpacker unpacker) throws IOException { + MessageAnnotations annotations = new MessageAnnotations(); + int fieldCount = unpacker.unpackMapHeader(); for (int i = 0; i < fieldCount; i++) { String fieldName = unpacker.unpackString().intern(); @@ -68,37 +70,31 @@ MessageAnnotations readMsgpack(MessageUnpacker unpacker) throws IOException { } if (fieldName.equals(SUMMARY)) { - summary = Summary.read(unpacker); + annotations.summary = Summary.read(unpacker); } else { Log.v(TAG, "Unexpected field: " + fieldName); unpacker.skipValue(); } } - return this; + return annotations; } - static MessageAnnotations fromMsgpack(MessageUnpacker unpacker) throws IOException { - return (new MessageAnnotations()).readMsgpack(unpacker); - } + static MessageAnnotations read(JsonElement json) throws MessageDecodeException { + if (!json.isJsonObject()) { + throw MessageDecodeException.fromDescription("Message annotations is of type \"" + json.getClass() + "\" when expected a JSON object."); + } - protected void read(final JsonObject map) throws MessageDecodeException { - final JsonElement summaryElement = map.get(SUMMARY); + MessageAnnotations annotations = new MessageAnnotations(); + + final JsonElement summaryElement = json.getAsJsonObject().get(SUMMARY); if (summaryElement != null) { if (!summaryElement.isJsonObject()) { throw MessageDecodeException.fromDescription("MessageAnnotations summary is of type \"" + summaryElement.getClass() + "\" when expected a JSON object."); } - summary = Summary.read(summaryElement.getAsJsonObject()); + annotations.summary = Summary.read(summaryElement.getAsJsonObject()); } - } - static MessageAnnotations read(JsonElement json) throws MessageDecodeException { - if (!json.isJsonObject()) { - Log.w(TAG, "Message annotations is of type \"" + json.getClass() + "\" when expected a JSON object."); - } - - MessageAnnotations annotations = new MessageAnnotations(); - annotations.read(json.getAsJsonObject()); return annotations; } diff --git a/lib/src/main/java/io/ably/lib/types/MessageVersion.java b/lib/src/main/java/io/ably/lib/types/MessageVersion.java index eaf3ae009..d98603779 100644 --- a/lib/src/main/java/io/ably/lib/types/MessageVersion.java +++ b/lib/src/main/java/io/ably/lib/types/MessageVersion.java @@ -108,7 +108,8 @@ void writeMsgpack(MessagePacker packer) throws IOException { } } - MessageVersion readMsgpack(MessageUnpacker unpacker) throws IOException { + static MessageVersion read(MessageUnpacker unpacker) throws IOException { + MessageVersion version = new MessageVersion(); int fieldCount = unpacker.unpackMapHeader(); for (int i = 0; i < fieldCount; i++) { String fieldName = unpacker.unpackString().intern(); @@ -120,24 +121,24 @@ MessageVersion readMsgpack(MessageUnpacker unpacker) throws IOException { switch (fieldName) { case SERIAL: - serial = unpacker.unpackString(); + version.serial = unpacker.unpackString(); break; case TIMESTAMP: - timestamp = unpacker.unpackLong(); + version.timestamp = unpacker.unpackLong(); break; case CLIENT_ID: - clientId = unpacker.unpackString(); + version.clientId = unpacker.unpackString(); break; case DESCRIPTION: - description = unpacker.unpackString(); + version.description = unpacker.unpackString(); break; case METADATA: int mapSize = unpacker.unpackMapHeader(); - metadata = new HashMap<>(mapSize); + version.metadata = new HashMap<>(mapSize); for (int j = 0; j < mapSize; j++) { String key = unpacker.unpackString(); String value = unpacker.unpackString(); - metadata.put(key, value); + version.metadata.put(key, value); } break; default: @@ -146,26 +147,9 @@ MessageVersion readMsgpack(MessageUnpacker unpacker) throws IOException { break; } } - return this; + return version; } - static MessageVersion fromMsgpack(MessageUnpacker unpacker) throws IOException { - return (new MessageVersion()).readMsgpack(unpacker); - } - - protected void read(final JsonObject map) throws MessageDecodeException { - serial = readString(map, SERIAL); - timestamp = readLong(map, TIMESTAMP);; - clientId = readString(map, CLIENT_ID);; - description = readString(map, DESCRIPTION);; - if (map.has(METADATA)) { - JsonObject metadataObject = map.getAsJsonObject(METADATA); - metadata = new HashMap<>(); - for (Map.Entry entry : metadataObject.entrySet()) { - metadata.put(entry.getKey(), entry.getValue().getAsString()); - } - } - } static MessageVersion read(JsonElement json) throws MessageDecodeException { if (!json.isJsonObject()) { @@ -173,16 +157,29 @@ static MessageVersion read(JsonElement json) throws MessageDecodeException { } MessageVersion version = new MessageVersion(); - version.read(json.getAsJsonObject()); + JsonObject map = json.getAsJsonObject(); + + version.serial = readString(map, SERIAL); + version.timestamp = readLong(map, TIMESTAMP);; + version.clientId = readString(map, CLIENT_ID);; + version.description = readString(map, DESCRIPTION);; + if (map.has(METADATA)) { + JsonObject metadataObject = map.getAsJsonObject(METADATA); + version.metadata = new HashMap<>(); + for (Map.Entry entry : metadataObject.entrySet()) { + version.metadata.put(entry.getKey(), entry.getValue().getAsString()); + } + } + return version; } - private String readString(JsonObject map, String key) { + private static String readString(JsonObject map, String key) { JsonElement element = map.get(key); return (element != null && !element.isJsonNull()) ? element.getAsString() : null; } - private long readLong(JsonObject map, String key) { + private static long readLong(JsonObject map, String key) { JsonElement element = map.get(key); return (element != null && !element.isJsonNull()) ? element.getAsLong() : 0; }