Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <openssl/x509.h>
#include <pulsar/MessageIdBuilder.h>

#include <chrono>
#include <fstream>

#include "AsioDefines.h"
Expand All @@ -31,6 +32,7 @@
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
#include "MockServer.h"
#include "OpSendMsg.h"
#include "ProducerImpl.h"
#include "PulsarApi.pb.h"
Expand Down Expand Up @@ -1005,15 +1007,17 @@ Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint6
void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative,
const std::string& listenerName, uint64_t requestId,
const LookupDataResultPromisePtr& promise) {
newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, promise);
newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, "LOOKUP",
promise);
}

void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, uint64_t requestId,
const LookupDataResultPromisePtr& promise) {
newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, promise);
newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, "PARTITIONED_METADATA",
promise);
}

void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType,
const LookupDataResultPromisePtr& promise) {
Lock lock(mutex_);
std::shared_ptr<LookupDataResultPtr> lookupDataResult;
Expand Down Expand Up @@ -1042,6 +1046,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
pendingLookupRequests_.insert(std::make_pair(requestId, requestData));
numOfPendingLookupRequest_++;
lock.unlock();
LOG_DEBUG(cnxString_ << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")");
sendCommand(cmd);
}

Expand Down Expand Up @@ -1158,12 +1163,15 @@ void ClientConnection::sendPendingCommands() {
}
}

Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId) {
Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId,
const char* requestType) {
Lock lock(mutex_);

if (isClosed()) {
lock.unlock();
Promise<Result, ResponseData> promise;
LOG_DEBUG(cnxString_ << "Fail " << requestType << "(req_id: " << requestId
<< ") to a closed connection");
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
Expand All @@ -1182,7 +1190,17 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf
pendingRequests_.insert(std::make_pair(requestId, requestData));
lock.unlock();

sendCommand(cmd);
LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")");
if (mockingRequests_.load(std::memory_order_acquire)) {
if (mockServer_ == nullptr) {
LOG_WARN(cnxString_ << "Mock server is unexpectedly null when processing " << requestType);
sendCommand(cmd);
} else if (!mockServer_->sendRequest(requestType, requestId)) {
sendCommand(cmd);
}
} else {
sendCommand(cmd);
}
return requestData.promise.getFuture();
}

Expand Down Expand Up @@ -1625,9 +1643,6 @@ void ClientConnection::handleConsumerStatsResponse(

void ClientConnection::handleLookupTopicRespose(
const proto::CommandLookupTopicResponse& lookupTopicResponse) {
LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
<< lookupTopicResponse.request_id());

Lock lock(mutex_);
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
if (it != pendingLookupRequests_.end()) {
Expand Down
22 changes: 20 additions & 2 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ struct ResponseData {

typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;

class MockServer;
class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<ClientConnection> {
enum State : uint8_t
{
Expand All @@ -123,6 +124,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
Ready,
Disconnected
};
using RequestDelayType =
std::unordered_map<std::string /* request type */, long /* delay in milliseconds */>;

public:
typedef std::shared_ptr<ASIO::ip::tcp::socket> SocketPtr;
Expand Down Expand Up @@ -185,7 +188,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
* Send a request with a specific Id over the connection. The future will be
* triggered when the response for this request is received
*/
Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd, int requestId);
Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd, int requestId,
const char* requestType);

const std::string& brokerAddress() const;

Expand All @@ -208,6 +212,13 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
Future<Result, SchemaInfo> newGetSchema(const std::string& topicName, const std::string& version,
uint64_t requestId);

void attachMockServer(const std::shared_ptr<MockServer>& mockServer) {
mockServer_ = mockServer;
// Mark that requests will first go through the mock server, if the mock server cannot process it,
// fall back to the normal logic
mockingRequests_.store(true, std::memory_order_release);
}

private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
Expand Down Expand Up @@ -264,7 +275,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
void handleSendPair(const ASIO_ERROR& err);
void sendPendingCommands();
void newLookup(const SharedBuffer& cmd, uint64_t requestId, const LookupDataResultPromisePtr& promise);
void newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType,
const LookupDataResultPromisePtr& promise);

void handleRequestTimeout(const ASIO_ERROR& ec, const PendingRequestData& pendingRequestData);

Expand Down Expand Up @@ -308,6 +320,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
}
}

void mockSendCommand(const char* requestType, uint64_t requestId, const SharedBuffer& cmd);

std::atomic<State> state_{Pending};
TimeDuration operationsTimeout_;
AuthenticationPtr authentication_;
Expand Down Expand Up @@ -391,6 +405,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
DeadlineTimerPtr keepAliveTimer_;
DeadlineTimerPtr consumerStatsRequestTimer_;

std::atomic_bool mockingRequests_{false};
std::shared_ptr<MockServer> mockServer_;

void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector<uint64_t>& consumerStatsRequests);

void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
Expand All @@ -405,6 +422,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

friend class PulsarFriend;
friend class ConsumerTest;
friend class MockServer;

void checkServerError(ServerError error, const std::string& message);

Expand Down
65 changes: 32 additions & 33 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ ConsumerImpl::~ConsumerImpl() {
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
auto requestId = newRequestId();
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId,
"CLOSE_CONSUMER");
cnx->removeConsumer(consumerId_);
LOG_INFO(consumerStr_ << "Closed consumer for race condition: " << consumerId_);
} else {
Expand Down Expand Up @@ -235,8 +236,9 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
// Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'incomming' to 'incoming'

Suggested change
// Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
// Register consumer so that we can handle other incoming commands (e.g. ACTIVE_CONSUMER_CHANGE) after

Copilot uses AI. Check for mistakes.
// sending the subscribe request.
cnx->registerConsumer(consumerId_, get_shared_this_ptr());
LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_);

if (duringSeek()) {
if (hasPendingSeek_.load(std::memory_order_acquire)) {
ackGroupingTrackerPtr_->flushAndClean();
}

Expand All @@ -259,14 +261,15 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
// Keep a reference to ensure object is kept alive.
auto self = get_shared_this_ptr();
setFirstRequestIdAfterConnect(requestId);
cnx->sendRequestWithId(cmd, requestId)
cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE")
.addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) {
Result handleResult = handleCreateConsumer(cnx, result);
if (handleResult == ResultOk) {
promise.setSuccess();
} else {
promise.setFailed(handleResult);
}
completeSeekCallback(ResultOk);
});

return promise.getFuture();
Expand Down Expand Up @@ -301,7 +304,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
LOG_INFO(getName() << "Closing subscribed consumer since it was already closed");
int requestId = client->newRequestId();
auto name = getName();
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId)
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId,
"CLOSE_CONSUMER")
.addListener([name](Result result, const ResponseData&) {
if (result == ResultOk) {
LOG_INFO(name << "Closed consumer successfully after subscribe completed");
Expand Down Expand Up @@ -354,7 +358,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
// in case it was indeed created, otherwise it might prevent new subscribe operation,
// since we are not closing the connection
auto requestId = newRequestId();
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId,
"CLOSE_CONSUMER");
}

if (consumerCreatedPromise_.isComplete()) {
Expand Down Expand Up @@ -408,7 +413,7 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback& originalCallback) {
auto requestId = newRequestId();
SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
auto self = get_shared_this_ptr();
cnx->sendRequestWithId(cmd, requestId)
cnx->sendRequestWithId(cmd, requestId, "UNSUBSCRIBE")
.addListener([self, callback](Result result, const ResponseData&) { callback(result); });
} else {
Result result = ResultNotConnected;
Expand Down Expand Up @@ -1125,19 +1130,14 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
* `startMessageId_` is updated so that we can discard messages after delivery restarts.
*/
void ConsumerImpl::clearReceiveQueue() {
if (duringSeek()) {
if (hasPendingSeek_.load(std::memory_order_acquire)) {
if (hasSoughtByTimestamp()) {
// Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be
// skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare.
startMessageId_ = std::nullopt;
} else {
startMessageId_ = seekMessageId_.get();
}
SeekStatus expected = SeekStatus::COMPLETED;
if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) {
auto seekCallback = seekCallback_.release();
executor_->postWork([seekCallback] { seekCallback(ResultOk); });
}
return;
} else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
return;
Expand Down Expand Up @@ -1374,7 +1374,7 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) {

auto requestId = newRequestId();
auto self = get_shared_this_ptr();
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId)
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, "CLOSE_CONSUMER")
.addListener([self, callback](Result result, const ResponseData&) { callback(result); });
}

Expand Down Expand Up @@ -1550,7 +1550,9 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, const ResultCallback& callb
}

const auto requestId = newRequestId();
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback);
auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {};
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId},
std::move(nonNullCallback));
}

void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) {
Expand All @@ -1564,8 +1566,9 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback)
}

const auto requestId = newRequestId();
auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {};
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp},
callback);
std::move(nonNullCallback));
}

bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
Expand Down Expand Up @@ -1723,18 +1726,17 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ ==
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }

void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg,
const ResultCallback& callback) {
ResultCallback&& callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
callback(ResultNotConnected);
return;
}

auto expected = SeekStatus::NOT_STARTED;
if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) {
LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is "
<< static_cast<int>(expected));
auto expected = false;
if (!hasPendingSeek_.compare_exchange_strong(expected, true)) {
LOG_ERROR(getName() << " attempted to seek " << seekArg << " when there is a pending seek");
callback(ResultNotAllowedError);
return;
}
Expand All @@ -1746,13 +1748,12 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
seekMessageId_ = *boost::get<MessageId>(&seekArg);
hasSoughtByTimestamp_.store(false, std::memory_order_release);
}
seekStatus_ = SeekStatus::IN_PROGRESS;
seekCallback_ = callback;
seekCallback_ = std::move(callback);
LOG_INFO(getName() << " Seeking subscription to " << seekArg);

auto weakSelf = weak_from_this();

cnx->sendRequestWithId(seek, requestId)
cnx->sendRequestWithId(seek, requestId, "SEEK")
.addListener([this, weakSelf, callback, originalSeekMessageId](Result result,
const ResponseData& responseData) {
auto self = weakSelf.lock();
Expand All @@ -1767,20 +1768,17 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
Lock lock(mutexForMessageId_);
lastDequedMessageId_ = MessageId::earliest();
lock.unlock();
if (getCnx().expired()) {
// It's during reconnection, complete the seek future after connection is established
seekStatus_ = SeekStatus::COMPLETED;
} else {

if (!getCnx().expired()) {
if (!hasSoughtByTimestamp()) {
startMessageId_ = seekMessageId_.get();
}
seekCallback_.release()(result);
}
completeSeekCallback(result);
} // else: complete the seek future after connection is established
} else {
LOG_ERROR(getName() << "Failed to seek: " << result);
seekMessageId_ = originalSeekMessageId;
seekStatus_ = SeekStatus::NOT_STARTED;
seekCallback_.release()(result);
completeSeekCallback(result);
}
});
}
Expand Down Expand Up @@ -1928,7 +1926,7 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const MessageI
auto requestId = newRequestId();
cnx->sendRequestWithId(
Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), ackSet, ackType, requestId),
requestId)
requestId, "ACK")
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
Expand Down Expand Up @@ -1958,7 +1956,8 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const std::set
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
if (config_.isAckReceiptEnabled()) {
auto requestId = newRequestId();
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId)
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId,
"ACK")
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
Expand Down
Loading
Loading