From 2f6ab2b67c6f107b0cef477ac86b708a285f45fa Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 18 Dec 2025 21:51:21 +0800 Subject: [PATCH 1/4] add debug logs for request insertion --- lib/ClientConnection.cc | 18 +++++++++++------- lib/ClientConnection.h | 6 ++++-- lib/ConsumerImpl.cc | 23 ++++++++++++++--------- lib/ProducerImpl.cc | 10 ++++++---- 4 files changed, 35 insertions(+), 22 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 86dfd9d1..de73ccca 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1005,15 +1005,17 @@ Future ClientConnection::newConsumerStats(uint6 void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative, const std::string& listenerName, uint64_t requestId, const LookupDataResultPromisePtr& promise) { - newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, promise); + newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, "LOOKUP", + promise); } void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, uint64_t requestId, const LookupDataResultPromisePtr& promise) { - newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, promise); + newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, "PARTITIONED_METADATA", + promise); } -void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, +void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType, const LookupDataResultPromisePtr& promise) { Lock lock(mutex_); std::shared_ptr lookupDataResult; @@ -1042,6 +1044,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, pendingLookupRequests_.insert(std::make_pair(requestId, requestData)); numOfPendingLookupRequest_++; lock.unlock(); + LOG_DEBUG(cnxString_ << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")"); sendCommand(cmd); } @@ -1158,12 +1161,15 @@ void ClientConnection::sendPendingCommands() { } } -Future ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId) { +Future ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId, + const char* requestType) { Lock lock(mutex_); if (isClosed()) { lock.unlock(); Promise promise; + LOG_DEBUG(cnxString_ << "Fail " << requestType << "(req_id: " << requestId + << ") to a closed connection"); promise.setFailed(ResultNotConnected); return promise.getFuture(); } @@ -1182,6 +1188,7 @@ Future ClientConnection::sendRequestWithId(const SharedBuf pendingRequests_.insert(std::make_pair(requestId, requestData)); lock.unlock(); + LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")"); sendCommand(cmd); return requestData.promise.getFuture(); } @@ -1625,9 +1632,6 @@ void ClientConnection::handleConsumerStatsResponse( void ClientConnection::handleLookupTopicRespose( const proto::CommandLookupTopicResponse& lookupTopicResponse) { - LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: " - << lookupTopicResponse.request_id()); - Lock lock(mutex_); auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id()); if (it != pendingLookupRequests_.end()) { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 18a7d846..0d2085cb 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -185,7 +185,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this sendRequestWithId(const SharedBuffer& cmd, int requestId); + Future sendRequestWithId(const SharedBuffer& cmd, int requestId, + const char* requestType); const std::string& brokerAddress() const; @@ -264,7 +265,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_thissendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER"); cnx->removeConsumer(consumerId_); LOG_INFO(consumerStr_ << "Closed consumer for race condition: " << consumerId_); } else { @@ -235,6 +236,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after // sending the subscribe request. cnx->registerConsumer(consumerId_, get_shared_this_ptr()); + LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_); if (duringSeek()) { ackGroupingTrackerPtr_->flushAndClean(); @@ -259,7 +261,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c // Keep a reference to ensure object is kept alive. auto self = get_shared_this_ptr(); setFirstRequestIdAfterConnect(requestId); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE") .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { Result handleResult = handleCreateConsumer(cnx, result); if (handleResult == ResultOk) { @@ -301,7 +303,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result LOG_INFO(getName() << "Closing subscribed consumer since it was already closed"); int requestId = client->newRequestId(); auto name = getName(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER") .addListener([name](Result result, const ResponseData&) { if (result == ResultOk) { LOG_INFO(name << "Closed consumer successfully after subscribe completed"); @@ -354,7 +357,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result // in case it was indeed created, otherwise it might prevent new subscribe operation, // since we are not closing the connection auto requestId = newRequestId(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER"); } if (consumerCreatedPromise_.isComplete()) { @@ -408,7 +412,7 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback& originalCallback) { auto requestId = newRequestId(); SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId); auto self = get_shared_this_ptr(); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "UNSUBSCRIBE") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } else { Result result = ResultNotConnected; @@ -1374,7 +1378,7 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) { auto requestId = newRequestId(); auto self = get_shared_this_ptr(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, "CLOSE_CONSUMER") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } @@ -1752,7 +1756,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c auto weakSelf = weak_from_this(); - cnx->sendRequestWithId(seek, requestId) + cnx->sendRequestWithId(seek, requestId, "SEEK") .addListener([this, weakSelf, callback, originalSeekMessageId](Result result, const ResponseData& responseData) { auto self = weakSelf.lock(); @@ -1928,7 +1932,7 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const MessageI auto requestId = newRequestId(); cnx->sendRequestWithId( Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), ackSet, ackType, requestId), - requestId) + requestId, "ACK") .addListener([callback](Result result, const ResponseData&) { if (callback) { callback(result); @@ -1958,7 +1962,8 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const std::set if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) { if (config_.isAckReceiptEnabled()) { auto requestId = newRequestId(); - cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId) + cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId, + "ACK") .addListener([callback](Result result, const ResponseData&) { if (callback) { callback(result); diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 9d6a9a08..7fd14c7c 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -160,7 +160,7 @@ Future ProducerImpl::connectionOpened(const ClientConnectionPtr& c // Keep a reference to ensure object is kept alive. auto self = shared_from_this(); setFirstRequestIdAfterConnect(requestId); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "PRODUCER") .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { Result handleResult = handleCreateProducer(cnx, result, responseData); if (handleResult == ResultOk) { @@ -204,7 +204,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result auto client = client_.lock(); if (client) { int requestId = client->newRequestId(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, + "CLOSE_PRODUCER"); } } if (!producerCreatedPromise_.isComplete()) { @@ -266,7 +267,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result auto client = client_.lock(); if (client) { int requestId = client->newRequestId(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, + "CLOSE_PRODUCER"); } } @@ -818,7 +820,7 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) { int requestId = client->newRequestId(); auto self = shared_from_this(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, "CLOSE_PRODUCER") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } From 9d32866ed70d075a46f171676e2c046fe462aa4b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 13 Jan 2026 21:23:45 +0800 Subject: [PATCH 2/4] Add tests for the race --- lib/ClientConnection.cc | 40 ++++++++++++++++++++++++++++++++++++++- lib/ClientConnection.h | 15 +++++++++++++++ tests/ConsumerSeekTest.cc | 19 +++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index de73ccca..57470795 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include "AsioDefines.h" @@ -1189,7 +1190,44 @@ Future ClientConnection::sendRequestWithId(const SharedBuf lock.unlock(); LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")"); - sendCommand(cmd); + if (mockingRequests_) { + if (auto iter = mockRequestDelays_.find(requestType); iter != mockRequestDelays_.end()) { + auto self = shared_from_this(); + if (strcmp(requestType, "SEEK") == 0) { + // Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers + executor_->postWork([this, self] { + std::vector consumerIds; + { + Lock lock(mutex_); + for (const auto& entry : consumers_) { + if (auto consumer = entry.second.lock()) { + consumerIds.push_back(consumer->getConsumerId()); + } + } + } + for (auto consumerId : consumerIds) { + proto::CommandCloseConsumer closeConsumerCmd; + closeConsumerCmd.set_consumer_id(consumerId); + self->handleCloseConsumer(closeConsumerCmd); + } + }); + } + auto timer = executor_->createDeadlineTimer(); + timer->expires_after(std::chrono::milliseconds(iter->second)); + LOG_INFO("Request " << requestType << " (req_id: " << requestId << ") is being delayed for " + << iter->second << " ms"); + timer->async_wait([self, cmd, requestId, timer](const ASIO_ERROR& ec) { + LOG_INFO("Complete request id: " << requestId); + proto::CommandSuccess success; + success.set_request_id(requestId); + self->handleSuccess(success); + }); + } else { + sendCommand(cmd); + } + } else { + sendCommand(cmd); + } return requestData.promise.getFuture(); } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 0d2085cb..ce857f45 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -123,6 +123,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this; public: typedef std::shared_ptr SocketPtr; @@ -209,6 +211,14 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newGetSchema(const std::string& topicName, const std::string& version, uint64_t requestId); + void mockRequestDelay(RequestDelayType requestDelays) { + if (mockingRequests_) { + throw new std::runtime_error("Already mocking requests"); + } + mockRequestDelays_.swap(requestDelays); + mockingRequests_ = true; + } + private: struct PendingRequestData { Promise promise; @@ -393,6 +403,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this& consumerStatsRequests); void startConsumerStatsTimer(std::vector consumerStatsRequests); diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc index f03ea5e3..c70cdad9 100644 --- a/tests/ConsumerSeekTest.cc +++ b/tests/ConsumerSeekTest.cc @@ -24,7 +24,9 @@ #include #include "HttpHelper.h" +#include "lib/Latch.h" #include "lib/LogUtils.h" +#include "tests/PulsarFriend.h" DECLARE_LOG_OBJECT() @@ -200,6 +202,23 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) { ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest())); } +// Verify the `seek` method won't be blocked forever in any order of the Subscribe response and Seek response +TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) { + Client client(lookupUrl); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer)); + + for (auto&& connection : PulsarFriend::getConnections(client)) { + connection->mockRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 600}}); + } + + Latch latch(1); + consumer.seekAsync(0L, [&latch](Result result) { latch.countdown(); }); + ASSERT_TRUE(latch.wait(std::chrono::seconds(5))); + + client.close(); +} + INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false)); } // namespace pulsar From 814afb19b0c64baee7ca0055efed6df27a6a0ce6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 14 Jan 2026 12:01:57 +0800 Subject: [PATCH 3/4] fix testSubscribeSeekRaces --- lib/ConsumerImpl.cc | 42 ++++++++++++++++++------------------------ lib/ConsumerImpl.h | 19 +++++++++---------- lib/Synchronized.h | 6 ++++++ 3 files changed, 33 insertions(+), 34 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 8dadc5d8..4f6c7f24 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -238,7 +238,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c cnx->registerConsumer(consumerId_, get_shared_this_ptr()); LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_); - if (duringSeek()) { + if (hasPendingSeek_.load(std::memory_order_acquire)) { ackGroupingTrackerPtr_->flushAndClean(); } @@ -269,6 +269,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c } else { promise.setFailed(handleResult); } + completeSeekCallback(ResultOk); }); return promise.getFuture(); @@ -1129,7 +1130,7 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { * `startMessageId_` is updated so that we can discard messages after delivery restarts. */ void ConsumerImpl::clearReceiveQueue() { - if (duringSeek()) { + if (hasPendingSeek_.load(std::memory_order_acquire)) { if (hasSoughtByTimestamp()) { // Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be // skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare. @@ -1137,11 +1138,6 @@ void ConsumerImpl::clearReceiveQueue() { } else { startMessageId_ = seekMessageId_.get(); } - SeekStatus expected = SeekStatus::COMPLETED; - if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) { - auto seekCallback = seekCallback_.release(); - executor_->postWork([seekCallback] { seekCallback(ResultOk); }); - } return; } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) { return; @@ -1554,7 +1550,9 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, const ResultCallback& callb } const auto requestId = newRequestId(); - seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback); + auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {}; + seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, + std::move(nonNullCallback)); } void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) { @@ -1568,8 +1566,9 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) } const auto requestId = newRequestId(); + auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {}; seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp}, - callback); + std::move(nonNullCallback)); } bool ConsumerImpl::isReadCompacted() { return readCompacted_; } @@ -1727,7 +1726,7 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg, - const ResultCallback& callback) { + ResultCallback&& callback) { ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { LOG_ERROR(getName() << " Client Connection not ready for Consumer"); @@ -1735,10 +1734,9 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c return; } - auto expected = SeekStatus::NOT_STARTED; - if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) { - LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is " - << static_cast(expected)); + auto expected = false; + if (hasPendingSeek_.compare_exchange_strong(expected, true)) { + LOG_ERROR(getName() << " attempted to seek " << seekArg << " when there is a pending seek"); callback(ResultNotAllowedError); return; } @@ -1750,8 +1748,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c seekMessageId_ = *boost::get(&seekArg); hasSoughtByTimestamp_.store(false, std::memory_order_release); } - seekStatus_ = SeekStatus::IN_PROGRESS; - seekCallback_ = callback; + seekCallback_ = std::move(callback); LOG_INFO(getName() << " Seeking subscription to " << seekArg); auto weakSelf = weak_from_this(); @@ -1771,20 +1768,17 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c Lock lock(mutexForMessageId_); lastDequedMessageId_ = MessageId::earliest(); lock.unlock(); - if (getCnx().expired()) { - // It's during reconnection, complete the seek future after connection is established - seekStatus_ = SeekStatus::COMPLETED; - } else { + + if (!getCnx().expired()) { if (!hasSoughtByTimestamp()) { startMessageId_ = seekMessageId_.get(); } - seekCallback_.release()(result); - } + completeSeekCallback(result); + } // else: complete the seek future after connection is established } else { LOG_ERROR(getName() << "Failed to seek: " << result); seekMessageId_ = originalSeekMessageId; - seekStatus_ = SeekStatus::NOT_STARTED; - seekCallback_.release()(result); + completeSeekCallback(result); } }); } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 63eb51d6..5e1d28ee 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -21,6 +21,7 @@ #include +#include #include #include #include @@ -77,13 +78,6 @@ const static std::string SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC"; const static std::string PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ"; -enum class SeekStatus : std::uint8_t -{ - NOT_STARTED, - IN_PROGRESS, - COMPLETED -}; - class ConsumerImpl : public ConsumerImplBase { public: ConsumerImpl(const ClientImplPtr& client, const std::string& topic, const std::string& subscriptionName, @@ -230,7 +224,13 @@ class ConsumerImpl : public ConsumerImplBase { } void seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg, - const ResultCallback& callback); + ResultCallback&& callback); + void completeSeekCallback(Result result) { + if (auto callback = seekCallback_.release()) { + callback(result); + } + hasPendingSeek_.store(false, std::memory_order_release); + } void processPossibleToDLQ(const MessageId& messageId, const ProcessDLQCallBack& cb); std::mutex mutexForReceiveWithZeroQueueSize; @@ -274,14 +274,13 @@ class ConsumerImpl : public ConsumerImplBase { MessageId lastDequedMessageId_{MessageId::earliest()}; MessageId lastMessageIdInBroker_{MessageId::earliest()}; - std::atomic seekStatus_{SeekStatus::NOT_STARTED}; Synchronized seekCallback_{[](Result) {}}; Synchronized> startMessageId_; + std::atomic_bool hasPendingSeek_{false}; Synchronized seekMessageId_{MessageId::earliest()}; std::atomic hasSoughtByTimestamp_{false}; bool hasSoughtByTimestamp() const { return hasSoughtByTimestamp_.load(std::memory_order_acquire); } - bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; } class ChunkedMessageCtx { public: diff --git a/lib/Synchronized.h b/lib/Synchronized.h index 5449a9fe..68ae37be 100644 --- a/lib/Synchronized.h +++ b/lib/Synchronized.h @@ -41,6 +41,12 @@ class Synchronized { return *this; } + Synchronized& operator=(T&& value) { + std::lock_guard lock(mutex_); + value_ = value; + return *this; + } + private: T value_; mutable std::mutex mutex_; From 1303c059ed2097a4124c4c54707aafcec49d1e13 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 14 Jan 2026 14:34:33 +0800 Subject: [PATCH 4/4] fix tests --- lib/ClientConnection.cc | 39 +++----------- lib/ClientConnection.h | 19 +++---- lib/ConsumerImpl.cc | 2 +- lib/ConsumerImpl.h | 8 +-- lib/MockServer.h | 105 ++++++++++++++++++++++++++++++++++++++ tests/ConsumerSeekTest.cc | 45 +++++++++++++--- 6 files changed, 165 insertions(+), 53 deletions(-) create mode 100644 lib/MockServer.h diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 57470795..4f7a1dd1 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -32,6 +32,7 @@ #include "ConsumerImpl.h" #include "ExecutorService.h" #include "LogUtils.h" +#include "MockServer.h" #include "OpSendMsg.h" #include "ProducerImpl.h" #include "PulsarApi.pb.h" @@ -1190,39 +1191,11 @@ Future ClientConnection::sendRequestWithId(const SharedBuf lock.unlock(); LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")"); - if (mockingRequests_) { - if (auto iter = mockRequestDelays_.find(requestType); iter != mockRequestDelays_.end()) { - auto self = shared_from_this(); - if (strcmp(requestType, "SEEK") == 0) { - // Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers - executor_->postWork([this, self] { - std::vector consumerIds; - { - Lock lock(mutex_); - for (const auto& entry : consumers_) { - if (auto consumer = entry.second.lock()) { - consumerIds.push_back(consumer->getConsumerId()); - } - } - } - for (auto consumerId : consumerIds) { - proto::CommandCloseConsumer closeConsumerCmd; - closeConsumerCmd.set_consumer_id(consumerId); - self->handleCloseConsumer(closeConsumerCmd); - } - }); - } - auto timer = executor_->createDeadlineTimer(); - timer->expires_after(std::chrono::milliseconds(iter->second)); - LOG_INFO("Request " << requestType << " (req_id: " << requestId << ") is being delayed for " - << iter->second << " ms"); - timer->async_wait([self, cmd, requestId, timer](const ASIO_ERROR& ec) { - LOG_INFO("Complete request id: " << requestId); - proto::CommandSuccess success; - success.set_request_id(requestId); - self->handleSuccess(success); - }); - } else { + if (mockingRequests_.load(std::memory_order_acquire)) { + if (mockServer_ == nullptr) { + LOG_WARN(cnxString_ << "Mock server is unexpectedly null when processing " << requestType); + sendCommand(cmd); + } else if (!mockServer_->sendRequest(requestType, requestId)) { sendCommand(cmd); } } else { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index ce857f45..aae53d23 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -115,6 +115,7 @@ struct ResponseData { typedef std::shared_ptr> NamespaceTopicsPtr; +class MockServer; class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this { enum State : uint8_t { @@ -211,12 +212,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newGetSchema(const std::string& topicName, const std::string& version, uint64_t requestId); - void mockRequestDelay(RequestDelayType requestDelays) { - if (mockingRequests_) { - throw new std::runtime_error("Already mocking requests"); - } - mockRequestDelays_.swap(requestDelays); - mockingRequests_ = true; + void attachMockServer(const std::shared_ptr& mockServer) { + mockServer_ = mockServer; + // Mark that requests will first go through the mock server, if the mock server cannot process it, + // fall back to the normal logic + mockingRequests_.store(true, std::memory_order_release); } private: @@ -320,6 +320,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this state_{Pending}; TimeDuration operationsTimeout_; AuthenticationPtr authentication_; @@ -403,10 +405,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this mockServer_; void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector& consumerStatsRequests); @@ -422,6 +422,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this +#include +#include +#include + +#include "ClientConnection.h" +#include "ConsumerImpl.h" +#include "ExecutorService.h" +#include "LogUtils.h" +#include "PulsarApi.pb.h" + +namespace pulsar { + +class MockServer { + public: + using RequestDelayType = std::unordered_map; + + MockServer(const ClientConnectionPtr& connection) : connection_(connection) {} + + void setRequestDelay(std::initializer_list delays) { + std::lock_guard lock(mutex_); + for (auto&& delay : delays) { + requestDelays_[delay.first] = delay.second; + } + } + + bool sendRequest(const std::string& request, uint64_t requestId) { + auto connection = connection_.lock(); + if (!connection) { + return false; + } + std::lock_guard lock(mutex_); + if (auto iter = requestDelays_.find(request); iter != requestDelays_.end()) { + // Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers + if (request == "SEEK") { + connection->executor_->postWork([connection] { + std::vector consumerIds; + { + std::lock_guard lock{connection->mutex_}; + for (auto&& kv : connection->consumers_) { + if (auto consumer = kv.second.lock()) { + consumerIds.push_back(consumer->getConsumerId()); + } + } + } + for (auto consumerId : consumerIds) { + proto::CommandCloseConsumer closeConsumerCmd; + closeConsumerCmd.set_consumer_id(consumerId); + connection->handleCloseConsumer(closeConsumerCmd); + } + }); + } + long delayMs = iter->second; + auto timer = connection->executor_->createDeadlineTimer(); + timer->expires_from_now(std::chrono::milliseconds(delayMs)); + timer->async_wait([connection, requestId, request, timer](const auto& ec) { + if (ec) { + LOG_INFO("Timer cancelled for request " << request << " with id " << requestId); + return; + } + if (connection->isClosed()) { + LOG_INFO("Connection is closed, not completing request " << request << " with id " + << requestId); + return; + } + LOG_INFO("Completing delayed request " << request << " with id " << requestId); + proto::CommandSuccess success; + success.set_request_id(requestId); + connection->handleSuccess(success); + }); + return true; + } else { + return false; + } + } + + private: + mutable std::mutex mutex_; + std::unordered_map requestDelays_; + ClientConnectionWeakPtr connection_; + + DECLARE_LOG_OBJECT() +}; + +} // namespace pulsar diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc index c70cdad9..c3589a93 100644 --- a/tests/ConsumerSeekTest.cc +++ b/tests/ConsumerSeekTest.cc @@ -19,13 +19,17 @@ #include #include +#include +#include +#include #include #include #include #include "HttpHelper.h" -#include "lib/Latch.h" +#include "lib/ClientConnection.h" #include "lib/LogUtils.h" +#include "lib/MockServer.h" #include "tests/PulsarFriend.h" DECLARE_LOG_OBJECT() @@ -202,19 +206,46 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) { ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest())); } +static void assertSeekWithTimeout(Consumer& consumer) { + using namespace std::chrono_literals; + auto promise = std::make_shared>(); + consumer.seekAsync(0L, [promise](Result result) { promise->set_value(result); }); + auto future = promise->get_future(); + ASSERT_EQ(future.wait_for(5s), std::future_status::ready); + ASSERT_EQ(future.get(), ResultOk); +} + // Verify the `seek` method won't be blocked forever in any order of the Subscribe response and Seek response TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) { Client client(lookupUrl); Consumer consumer; ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer)); - for (auto&& connection : PulsarFriend::getConnections(client)) { - connection->mockRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 600}}); - } + auto connection = *PulsarFriend::getConnections(client).begin(); + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); + + mockServer->setRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 500}}); + assertSeekWithTimeout(consumer); + + mockServer->setRequestDelay({{"SUBSCRIBE", 500}, {"SEEK", 1000}}); + assertSeekWithTimeout(consumer); + + client.close(); +} + +TEST_F(ConsumerSeekTest, testReconnectionSlow) { + Client client(lookupUrl, ClientConfiguration().setInitialBackoffIntervalMs(500)); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer)); + + auto connection = *PulsarFriend::getConnections(client).begin(); + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); - Latch latch(1); - consumer.seekAsync(0L, [&latch](Result result) { latch.countdown(); }); - ASSERT_TRUE(latch.wait(std::chrono::seconds(5))); + // Make seek response received before `connectionOpened` is called + mockServer->setRequestDelay({{"SEEK", 100}}); + assertSeekWithTimeout(consumer); client.close(); }