Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions lib/src/main/java/io/ably/lib/http/HttpCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,10 @@ private Map<String, String> collectRequestHeaders(URL url, String method, Param[
requestHeaders.put(HttpConstants.Headers.ACCEPT, HttpConstants.ContentTypes.JSON);
}

/* pass required headers */
requestHeaders.put(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7a
if (!requestHeaders.containsKey(Defaults.ABLY_PROTOCOL_VERSION_HEADER)) {
requestHeaders.put(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7e
}

Map<String, String> additionalAgents = new HashMap<>();
if (options.agents != null) additionalAgents.putAll(options.agents);
if (dynamicAgents != null) additionalAgents.putAll(dynamicAgents);
Expand Down
18 changes: 13 additions & 5 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@
import io.ably.lib.types.DeltaExtras;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.Message;
import io.ably.lib.types.MessageAction;
import io.ably.lib.types.MessageAnnotations;
import io.ably.lib.types.MessageDecodeException;
import io.ably.lib.types.MessageSerializer;
import io.ably.lib.types.MessageVersion;
import io.ably.lib.types.PaginatedResult;
import io.ably.lib.types.Param;
import io.ably.lib.types.PresenceMessage;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.ProtocolMessage.Action;
import io.ably.lib.types.ProtocolMessage.Flag;
import io.ably.lib.types.Summary;
import io.ably.lib.util.CollectionUtils;
import io.ably.lib.util.EventEmitter;
import io.ably.lib.util.Log;
Expand Down Expand Up @@ -901,10 +903,16 @@ private void onMessage(final ProtocolMessage protocolMessage) {
if(msg.connectionId == null) msg.connectionId = protocolMessage.connectionId;
if(msg.timestamp == 0) msg.timestamp = protocolMessage.timestamp;
if(msg.id == null) msg.id = protocolMessage.id + ':' + i;
// (TM2k)
if(msg.serial == null && msg.version != null && msg.action == MessageAction.MESSAGE_CREATE) msg.serial = msg.version;
// (TM2o)
if(msg.createdAt == null && msg.action == MessageAction.MESSAGE_CREATE) msg.createdAt = msg.timestamp;
// (TM2s)
if(msg.version == null) msg.version = new MessageVersion(msg.serial, msg.timestamp);
// (TM2s1)
if(msg.version.serial == null) msg.version.serial = msg.serial;
// (TM2s2)
if(msg.version.timestamp == 0) msg.version.timestamp = msg.timestamp;
// (TM2u)
if(msg.annotations == null) msg.annotations = new MessageAnnotations();
// (TM8a)
if(msg.annotations.summary == null) msg.annotations.summary = new Summary(new HashMap<>());

try {
if (msg.data != null) msg.decode(options, decodingContext);
Expand Down
49 changes: 40 additions & 9 deletions lib/src/main/java/io/ably/lib/rest/AblyBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.ably.lib.platform.Platform;
import io.ably.lib.push.Push;
import io.ably.lib.realtime.Connection;
import io.ably.lib.transport.Defaults;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.AsyncHttpPaginatedResponse;
import io.ably.lib.types.AsyncPaginatedResult;
Expand Down Expand Up @@ -44,6 +45,16 @@
*/
public abstract class AblyBase implements AutoCloseable {

/**
* Some REST endpoints (e.g., stats and batch) changed in protocol v3.
* To preserve backward compatibility for those specific endpoints, we
* explicitly request protocol v2 when calling them.
* <p>
* Use this only for legacy endpoints that must remain on v2; all other
* calls should use the default protocol version.
*/
private static final int LEGACY_API_PROTOCOL_V2 = 2;

public final ClientOptions options;
public final Http http;
public final HttpCore httpCore;
Expand Down Expand Up @@ -249,7 +260,17 @@ public PaginatedResult<Stats> stats(Param[] params) throws AblyException {
}

PaginatedResult<Stats> stats(Http http, Param[] params) throws AblyException {
return new PaginatedQuery<>(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler).get();
return new PaginatedQuery<>(
http,
"/stats",
// Stats api uses protocol v2 format for now
Param.set(
HttpUtils.defaultAcceptHeaders(false),
new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, LEGACY_API_PROTOCOL_V2)
),
params,
StatsReader.statsResponseHandler
).get();
}

/**
Expand All @@ -276,8 +297,18 @@ public void statsAsync(Param[] params, Callback<AsyncPaginatedResult<Stats>> cal
statsAsync(http, params, callback);
}

void statsAsync(Http http, Param[] params, Callback<AsyncPaginatedResult<Stats>> callback) {
(new AsyncPaginatedQuery<Stats>(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler)).get(callback);
void statsAsync(Http http, Param[] params, Callback<AsyncPaginatedResult<Stats>> callback) {
(new AsyncPaginatedQuery<Stats>(
http,
"/stats",
// Stats api uses protocol v2 format for now
Param.set(
HttpUtils.defaultAcceptHeaders(false),
new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, LEGACY_API_PROTOCOL_V2)
),
params,
StatsReader.statsResponseHandler
)).get(callback);
}

/**
Expand Down Expand Up @@ -433,7 +464,12 @@ private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] p
public void execute(HttpScheduler http, final Callback<PublishResponse[]> callback) throws AblyException {
HttpCore.RequestBody requestBody = options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(pubSpecs) : MessageSerializer.asJSONRequest(pubSpecs);
final Param[] params = options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams ; // RSC7c
http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), params, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
// This method uses an old batch format from protocol v2
Param[] headers = Param.set(
HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol),
new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, LEGACY_API_PROTOCOL_V2)
);
http.post("/messages", headers, params, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
@Override
public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException {
if(error != null && error.code != 40020) {
Expand All @@ -446,11 +482,6 @@ public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo er
});
}

/**
* Authentication token has changed. waitForResult is true if there is a need to
* wait for server response to auth request
*/

/**
* Override this method in AblyRealtime and pass updated token to ConnectionManager
* @param token new token
Expand Down
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/transport/Defaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class Defaults {
* spec: G4
* </p>
*/
public static final String ABLY_PROTOCOL_VERSION = "2";
public static final String ABLY_PROTOCOL_VERSION = "4";

public static final String ABLY_AGENT_VERSION = String.format("%s/%s", "ably-java", BuildConfig.VERSION);

Expand Down
Loading
Loading