From 8483625a3ded7183b689f82d29ca9549c164651e Mon Sep 17 00:00:00 2001 From: yanyuan06 Date: Wed, 7 Jan 2026 16:19:22 +0800 Subject: [PATCH] surpport tag for selective channel --- src/brpc/controller.cpp | 2 +- src/brpc/selective_channel.cpp | 29 +++++++++++++++++------------ src/brpc/selective_channel.h | 8 ++++++-- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index b30a13476e..d7b511dbd4 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -862,7 +862,7 @@ void Controller::Call::OnComplete( } } - if (need_feedback) { + if (need_feedback && c->_lb) { const LoadBalancer::CallInfo info = { begin_time_us, peer_id, error_code, c }; c->_lb->Feedback(info); diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index 21b871af3f..dd155a3044 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -83,9 +83,9 @@ class ChannelBalancer : public SharedLoadBalancer { ChannelBalancer() {} ~ChannelBalancer(); int Init(const char* lb_name); - int AddChannel(ChannelBase* sub_channel, + int AddChannel(ChannelBase* sub_channel, const std::string& tag, SelectiveChannel::ChannelHandle* handle); - void RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle handle); + void RemoveAndDestroyChannel(const SelectiveChannel::ChannelHandle& handle); int SelectChannel(const LoadBalancer::SelectIn& in, SelectOut* out); int CheckHealth(); void Describe(std::ostream& os, const DescribeOptions&); @@ -168,7 +168,7 @@ int ChannelBalancer::Init(const char* lb_name) { return SharedLoadBalancer::Init(lb_name); } -int ChannelBalancer::AddChannel(ChannelBase* sub_channel, +int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag, SelectiveChannel::ChannelHandle* handle) { if (NULL == sub_channel) { LOG(ERROR) << "Parameter[sub_channel] is NULL"; @@ -206,7 +206,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, << sock_id << " is disabled"; return -1; } - if (!AddServer(ServerId(sock_id))) { + if (!AddServer(ServerId(sock_id, tag))) { LOG(ERROR) << "Duplicated sub_channel=" << sub_channel; // sub_chan will be deleted when the socket is recycled. ptr->SetFailed(); @@ -217,17 +217,18 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, // The health-check-related reference has been held on created. _chan_map[sub_channel]= ptr.get(); if (handle) { - *handle = sock_id; + handle->id = sock_id; + handle->tag = tag; } return 0; } -void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle handle) { - if (!RemoveServer(ServerId(handle))) { +void ChannelBalancer::RemoveAndDestroyChannel(const SelectiveChannel::ChannelHandle& handle) { + if (!RemoveServer(ServerId(handle.id, handle.tag))) { return; } SocketUniquePtr ptr; - const int rc = Socket::AddressFailedAsWell(handle, &ptr); + const int rc = Socket::AddressFailedAsWell(handle.id, &ptr); if (rc >= 0) { SubChannel* sub = static_cast(ptr->user()); { @@ -311,8 +312,6 @@ int Sender::IssueRPC(int64_t start_realtime_us) { _main_cntl->SetFailed(rc, "Fail to select channel, %s", berror(rc)); return -1; } - DLOG(INFO) << "Selected channel=" << sel_out.channel() << ", size=" - << (_main_cntl->_accessed ? _main_cntl->_accessed->size() : 0); _main_cntl->_current_call.need_feedback = sel_out.need_feedback; _main_cntl->_current_call.peer_id = sel_out.fake_sock->id(); @@ -534,16 +533,22 @@ bool SelectiveChannel::initialized() const { int SelectiveChannel::AddChannel(ChannelBase* sub_channel, ChannelHandle* handle) { + return AddChannel(sub_channel, "", handle); +} + +int SelectiveChannel::AddChannel(ChannelBase* sub_channel, + const std::string& tag, + ChannelHandle* handle) { schan::ChannelBalancer* lb = static_cast(_chan._lb.get()); if (lb == NULL) { LOG(ERROR) << "You must call Init() to initialize a SelectiveChannel"; return -1; } - return lb->AddChannel(sub_channel, handle); + return lb->AddChannel(sub_channel, tag, handle); } -void SelectiveChannel::RemoveAndDestroyChannel(ChannelHandle handle) { +void SelectiveChannel::RemoveAndDestroyChannel(const ChannelHandle& handle) { schan::ChannelBalancer* lb = static_cast(_chan._lb.get()); if (lb == NULL) { diff --git a/src/brpc/selective_channel.h b/src/brpc/selective_channel.h index 8b93e5bf69..6c0af1da9c 100644 --- a/src/brpc/selective_channel.h +++ b/src/brpc/selective_channel.h @@ -51,7 +51,10 @@ namespace brpc { // in `done'. class SelectiveChannel : public ChannelBase/*non-copyable*/ { public: - typedef SocketId ChannelHandle; + struct ChannelHandle { + SocketId id; + std::string tag; + }; SelectiveChannel(); ~SelectiveChannel(); @@ -67,9 +70,10 @@ class SelectiveChannel : public ChannelBase/*non-copyable*/ { // NOTE: Different from pchan, schan can add channels at any time. // Returns 0 on success, -1 otherwise. int AddChannel(ChannelBase* sub_channel, ChannelHandle* handle); + int AddChannel(ChannelBase* sub_channel, const std::string& tag, ChannelHandle* handle); // Remove and destroy the sub_channel associated with `handle'. - void RemoveAndDestroyChannel(ChannelHandle handle); + void RemoveAndDestroyChannel(const ChannelHandle& handle); // Send request by a sub channel. schan may retry another sub channel // according to retrying/backup-request settings.