diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 86dfd9d1..4f7a1dd1 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include "AsioDefines.h" @@ -31,6 +32,7 @@ #include "ConsumerImpl.h" #include "ExecutorService.h" #include "LogUtils.h" +#include "MockServer.h" #include "OpSendMsg.h" #include "ProducerImpl.h" #include "PulsarApi.pb.h" @@ -1005,15 +1007,17 @@ Future ClientConnection::newConsumerStats(uint6 void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative, const std::string& listenerName, uint64_t requestId, const LookupDataResultPromisePtr& promise) { - newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, promise); + newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, "LOOKUP", + promise); } void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, uint64_t requestId, const LookupDataResultPromisePtr& promise) { - newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, promise); + newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, "PARTITIONED_METADATA", + promise); } -void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, +void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType, const LookupDataResultPromisePtr& promise) { Lock lock(mutex_); std::shared_ptr lookupDataResult; @@ -1042,6 +1046,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, pendingLookupRequests_.insert(std::make_pair(requestId, requestData)); numOfPendingLookupRequest_++; lock.unlock(); + LOG_DEBUG(cnxString_ << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")"); sendCommand(cmd); } @@ -1158,12 +1163,15 @@ void ClientConnection::sendPendingCommands() { } } -Future ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId) { +Future ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId, + const char* requestType) { Lock lock(mutex_); if (isClosed()) { lock.unlock(); Promise promise; + LOG_DEBUG(cnxString_ << "Fail " << requestType << "(req_id: " << requestId + << ") to a closed connection"); promise.setFailed(ResultNotConnected); return promise.getFuture(); } @@ -1182,7 +1190,17 @@ Future ClientConnection::sendRequestWithId(const SharedBuf pendingRequests_.insert(std::make_pair(requestId, requestData)); lock.unlock(); - sendCommand(cmd); + LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")"); + if (mockingRequests_.load(std::memory_order_acquire)) { + if (mockServer_ == nullptr) { + LOG_WARN(cnxString_ << "Mock server is unexpectedly null when processing " << requestType); + sendCommand(cmd); + } else if (!mockServer_->sendRequest(requestType, requestId)) { + sendCommand(cmd); + } + } else { + sendCommand(cmd); + } return requestData.promise.getFuture(); } @@ -1625,9 +1643,6 @@ void ClientConnection::handleConsumerStatsResponse( void ClientConnection::handleLookupTopicRespose( const proto::CommandLookupTopicResponse& lookupTopicResponse) { - LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: " - << lookupTopicResponse.request_id()); - Lock lock(mutex_); auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id()); if (it != pendingLookupRequests_.end()) { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 18a7d846..aae53d23 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -115,6 +115,7 @@ struct ResponseData { typedef std::shared_ptr> NamespaceTopicsPtr; +class MockServer; class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this { enum State : uint8_t { @@ -123,6 +124,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this; public: typedef std::shared_ptr SocketPtr; @@ -185,7 +188,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this sendRequestWithId(const SharedBuffer& cmd, int requestId); + Future sendRequestWithId(const SharedBuffer& cmd, int requestId, + const char* requestType); const std::string& brokerAddress() const; @@ -208,6 +212,13 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newGetSchema(const std::string& topicName, const std::string& version, uint64_t requestId); + void attachMockServer(const std::shared_ptr& mockServer) { + mockServer_ = mockServer; + // Mark that requests will first go through the mock server, if the mock server cannot process it, + // fall back to the normal logic + mockingRequests_.store(true, std::memory_order_release); + } + private: struct PendingRequestData { Promise promise; @@ -264,7 +275,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this state_{Pending}; TimeDuration operationsTimeout_; AuthenticationPtr authentication_; @@ -391,6 +405,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this mockServer_; + void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector& consumerStatsRequests); void startConsumerStatsTimer(std::vector consumerStatsRequests); @@ -405,6 +422,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_thissendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER"); cnx->removeConsumer(consumerId_); LOG_INFO(consumerStr_ << "Closed consumer for race condition: " << consumerId_); } else { @@ -235,8 +236,9 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after // sending the subscribe request. cnx->registerConsumer(consumerId_, get_shared_this_ptr()); + LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_); - if (duringSeek()) { + if (hasPendingSeek_.load(std::memory_order_acquire)) { ackGroupingTrackerPtr_->flushAndClean(); } @@ -259,7 +261,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c // Keep a reference to ensure object is kept alive. auto self = get_shared_this_ptr(); setFirstRequestIdAfterConnect(requestId); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE") .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { Result handleResult = handleCreateConsumer(cnx, result); if (handleResult == ResultOk) { @@ -267,6 +269,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c } else { promise.setFailed(handleResult); } + completeSeekCallback(ResultOk); }); return promise.getFuture(); @@ -301,7 +304,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result LOG_INFO(getName() << "Closing subscribed consumer since it was already closed"); int requestId = client->newRequestId(); auto name = getName(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER") .addListener([name](Result result, const ResponseData&) { if (result == ResultOk) { LOG_INFO(name << "Closed consumer successfully after subscribe completed"); @@ -354,7 +358,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result // in case it was indeed created, otherwise it might prevent new subscribe operation, // since we are not closing the connection auto requestId = newRequestId(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER"); } if (consumerCreatedPromise_.isComplete()) { @@ -408,7 +413,7 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback& originalCallback) { auto requestId = newRequestId(); SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId); auto self = get_shared_this_ptr(); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "UNSUBSCRIBE") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } else { Result result = ResultNotConnected; @@ -1125,7 +1130,7 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { * `startMessageId_` is updated so that we can discard messages after delivery restarts. */ void ConsumerImpl::clearReceiveQueue() { - if (duringSeek()) { + if (hasPendingSeek_.load(std::memory_order_acquire)) { if (hasSoughtByTimestamp()) { // Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be // skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare. @@ -1133,11 +1138,6 @@ void ConsumerImpl::clearReceiveQueue() { } else { startMessageId_ = seekMessageId_.get(); } - SeekStatus expected = SeekStatus::COMPLETED; - if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) { - auto seekCallback = seekCallback_.release(); - executor_->postWork([seekCallback] { seekCallback(ResultOk); }); - } return; } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) { return; @@ -1374,7 +1374,7 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) { auto requestId = newRequestId(); auto self = get_shared_this_ptr(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, "CLOSE_CONSUMER") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } @@ -1550,7 +1550,9 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, const ResultCallback& callb } const auto requestId = newRequestId(); - seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback); + auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {}; + seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, + std::move(nonNullCallback)); } void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) { @@ -1564,8 +1566,9 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) } const auto requestId = newRequestId(); + auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {}; seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp}, - callback); + std::move(nonNullCallback)); } bool ConsumerImpl::isReadCompacted() { return readCompacted_; } @@ -1723,7 +1726,7 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg, - const ResultCallback& callback) { + ResultCallback&& callback) { ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { LOG_ERROR(getName() << " Client Connection not ready for Consumer"); @@ -1731,10 +1734,9 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c return; } - auto expected = SeekStatus::NOT_STARTED; - if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) { - LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is " - << static_cast(expected)); + auto expected = false; + if (!hasPendingSeek_.compare_exchange_strong(expected, true)) { + LOG_ERROR(getName() << " attempted to seek " << seekArg << " when there is a pending seek"); callback(ResultNotAllowedError); return; } @@ -1746,13 +1748,12 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c seekMessageId_ = *boost::get(&seekArg); hasSoughtByTimestamp_.store(false, std::memory_order_release); } - seekStatus_ = SeekStatus::IN_PROGRESS; - seekCallback_ = callback; + seekCallback_ = std::move(callback); LOG_INFO(getName() << " Seeking subscription to " << seekArg); auto weakSelf = weak_from_this(); - cnx->sendRequestWithId(seek, requestId) + cnx->sendRequestWithId(seek, requestId, "SEEK") .addListener([this, weakSelf, callback, originalSeekMessageId](Result result, const ResponseData& responseData) { auto self = weakSelf.lock(); @@ -1767,20 +1768,17 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c Lock lock(mutexForMessageId_); lastDequedMessageId_ = MessageId::earliest(); lock.unlock(); - if (getCnx().expired()) { - // It's during reconnection, complete the seek future after connection is established - seekStatus_ = SeekStatus::COMPLETED; - } else { + + if (!getCnx().expired()) { if (!hasSoughtByTimestamp()) { startMessageId_ = seekMessageId_.get(); } - seekCallback_.release()(result); - } + completeSeekCallback(result); + } // else: complete the seek future after connection is established } else { LOG_ERROR(getName() << "Failed to seek: " << result); seekMessageId_ = originalSeekMessageId; - seekStatus_ = SeekStatus::NOT_STARTED; - seekCallback_.release()(result); + completeSeekCallback(result); } }); } @@ -1928,7 +1926,7 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const MessageI auto requestId = newRequestId(); cnx->sendRequestWithId( Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), ackSet, ackType, requestId), - requestId) + requestId, "ACK") .addListener([callback](Result result, const ResponseData&) { if (callback) { callback(result); @@ -1958,7 +1956,8 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const std::set if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) { if (config_.isAckReceiptEnabled()) { auto requestId = newRequestId(); - cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId) + cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId, + "ACK") .addListener([callback](Result result, const ResponseData&) { if (callback) { callback(result); diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 63eb51d6..9a1fe7cb 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -21,6 +21,7 @@ #include +#include #include #include #include @@ -77,13 +78,6 @@ const static std::string SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC"; const static std::string PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ"; -enum class SeekStatus : std::uint8_t -{ - NOT_STARTED, - IN_PROGRESS, - COMPLETED -}; - class ConsumerImpl : public ConsumerImplBase { public: ConsumerImpl(const ClientImplPtr& client, const std::string& topic, const std::string& subscriptionName, @@ -230,7 +224,15 @@ class ConsumerImpl : public ConsumerImplBase { } void seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg, - const ResultCallback& callback); + ResultCallback&& callback); + void completeSeekCallback(Result result) { + bool expected = true; + if (hasPendingSeek_.compare_exchange_strong(expected, false)) { + if (auto callback = seekCallback_.release()) { + callback(result); + } + } + } void processPossibleToDLQ(const MessageId& messageId, const ProcessDLQCallBack& cb); std::mutex mutexForReceiveWithZeroQueueSize; @@ -274,14 +276,13 @@ class ConsumerImpl : public ConsumerImplBase { MessageId lastDequedMessageId_{MessageId::earliest()}; MessageId lastMessageIdInBroker_{MessageId::earliest()}; - std::atomic seekStatus_{SeekStatus::NOT_STARTED}; Synchronized seekCallback_{[](Result) {}}; Synchronized> startMessageId_; + std::atomic_bool hasPendingSeek_{false}; Synchronized seekMessageId_{MessageId::earliest()}; std::atomic hasSoughtByTimestamp_{false}; bool hasSoughtByTimestamp() const { return hasSoughtByTimestamp_.load(std::memory_order_acquire); } - bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; } class ChunkedMessageCtx { public: diff --git a/lib/MockServer.h b/lib/MockServer.h new file mode 100644 index 00000000..259eecfd --- /dev/null +++ b/lib/MockServer.h @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "ClientConnection.h" +#include "ConsumerImpl.h" +#include "ExecutorService.h" +#include "LogUtils.h" +#include "PulsarApi.pb.h" + +namespace pulsar { + +class MockServer { + public: + using RequestDelayType = std::unordered_map; + + MockServer(const ClientConnectionPtr& connection) : connection_(connection) {} + + void setRequestDelay(std::initializer_list delays) { + std::lock_guard lock(mutex_); + for (auto&& delay : delays) { + requestDelays_[delay.first] = delay.second; + } + } + + bool sendRequest(const std::string& request, uint64_t requestId) { + auto connection = connection_.lock(); + if (!connection) { + return false; + } + std::lock_guard lock(mutex_); + if (auto iter = requestDelays_.find(request); iter != requestDelays_.end()) { + // Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers + if (request == "SEEK") { + connection->executor_->postWork([connection] { + std::vector consumerIds; + { + std::lock_guard lock{connection->mutex_}; + for (auto&& kv : connection->consumers_) { + if (auto consumer = kv.second.lock()) { + consumerIds.push_back(consumer->getConsumerId()); + } + } + } + for (auto consumerId : consumerIds) { + proto::CommandCloseConsumer closeConsumerCmd; + closeConsumerCmd.set_consumer_id(consumerId); + connection->handleCloseConsumer(closeConsumerCmd); + } + }); + } + long delayMs = iter->second; + auto timer = connection->executor_->createDeadlineTimer(); + timer->expires_from_now(std::chrono::milliseconds(delayMs)); + timer->async_wait([connection, requestId, request, timer](const auto& ec) { + if (ec) { + LOG_INFO("Timer cancelled for request " << request << " with id " << requestId); + return; + } + if (connection->isClosed()) { + LOG_INFO("Connection is closed, not completing request " << request << " with id " + << requestId); + return; + } + LOG_INFO("Completing delayed request " << request << " with id " << requestId); + proto::CommandSuccess success; + success.set_request_id(requestId); + connection->handleSuccess(success); + }); + return true; + } else { + return false; + } + } + + private: + mutable std::mutex mutex_; + std::unordered_map requestDelays_; + ClientConnectionWeakPtr connection_; + + DECLARE_LOG_OBJECT() +}; + +} // namespace pulsar diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 9d6a9a08..7fd14c7c 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -160,7 +160,7 @@ Future ProducerImpl::connectionOpened(const ClientConnectionPtr& c // Keep a reference to ensure object is kept alive. auto self = shared_from_this(); setFirstRequestIdAfterConnect(requestId); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "PRODUCER") .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { Result handleResult = handleCreateProducer(cnx, result, responseData); if (handleResult == ResultOk) { @@ -204,7 +204,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result auto client = client_.lock(); if (client) { int requestId = client->newRequestId(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, + "CLOSE_PRODUCER"); } } if (!producerCreatedPromise_.isComplete()) { @@ -266,7 +267,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result auto client = client_.lock(); if (client) { int requestId = client->newRequestId(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, + "CLOSE_PRODUCER"); } } @@ -818,7 +820,7 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) { int requestId = client->newRequestId(); auto self = shared_from_this(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, "CLOSE_PRODUCER") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } diff --git a/lib/Synchronized.h b/lib/Synchronized.h index 5449a9fe..68ae37be 100644 --- a/lib/Synchronized.h +++ b/lib/Synchronized.h @@ -41,6 +41,12 @@ class Synchronized { return *this; } + Synchronized& operator=(T&& value) { + std::lock_guard lock(mutex_); + value_ = value; + return *this; + } + private: T value_; mutable std::mutex mutex_; diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc index f03ea5e3..c3589a93 100644 --- a/tests/ConsumerSeekTest.cc +++ b/tests/ConsumerSeekTest.cc @@ -19,12 +19,18 @@ #include #include +#include +#include +#include #include #include #include #include "HttpHelper.h" +#include "lib/ClientConnection.h" #include "lib/LogUtils.h" +#include "lib/MockServer.h" +#include "tests/PulsarFriend.h" DECLARE_LOG_OBJECT() @@ -200,6 +206,50 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) { ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest())); } +static void assertSeekWithTimeout(Consumer& consumer) { + using namespace std::chrono_literals; + auto promise = std::make_shared>(); + consumer.seekAsync(0L, [promise](Result result) { promise->set_value(result); }); + auto future = promise->get_future(); + ASSERT_EQ(future.wait_for(5s), std::future_status::ready); + ASSERT_EQ(future.get(), ResultOk); +} + +// Verify the `seek` method won't be blocked forever in any order of the Subscribe response and Seek response +TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) { + Client client(lookupUrl); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer)); + + auto connection = *PulsarFriend::getConnections(client).begin(); + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); + + mockServer->setRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 500}}); + assertSeekWithTimeout(consumer); + + mockServer->setRequestDelay({{"SUBSCRIBE", 500}, {"SEEK", 1000}}); + assertSeekWithTimeout(consumer); + + client.close(); +} + +TEST_F(ConsumerSeekTest, testReconnectionSlow) { + Client client(lookupUrl, ClientConfiguration().setInitialBackoffIntervalMs(500)); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer)); + + auto connection = *PulsarFriend::getConnections(client).begin(); + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); + + // Make seek response received before `connectionOpened` is called + mockServer->setRequestDelay({{"SEEK", 100}}); + assertSeekWithTimeout(consumer); + + client.close(); +} + INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false)); } // namespace pulsar