Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ typedef std::map<ChannelBase*, Socket*> ChannelToIdMap;
class SubChannel : public SocketUser {
public:
ChannelBase* chan;
ChannelOwnership ownership;

// internal channel is deleted after the fake Socket is SetFailed
void BeforeRecycle(Socket*) {
delete chan;
if (ownership == OWNS_CHANNEL) {
delete chan;
}
delete this;
}

Expand Down Expand Up @@ -84,6 +87,7 @@ class ChannelBalancer : public SharedLoadBalancer {
~ChannelBalancer();
int Init(const char* lb_name);
int AddChannel(ChannelBase* sub_channel, const std::string& tag,
ChannelOwnership ownership,
SelectiveChannel::ChannelHandle* handle);
void RemoveAndDestroyChannel(const SelectiveChannel::ChannelHandle& handle);
int SelectChannel(const LoadBalancer::SelectIn& in, SelectOut* out);
Expand Down Expand Up @@ -168,7 +172,9 @@ int ChannelBalancer::Init(const char* lb_name) {
return SharedLoadBalancer::Init(lb_name);
}

int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag,
int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
const std::string& tag,
ChannelOwnership ownership,
SelectiveChannel::ChannelHandle* handle) {
if (NULL == sub_channel) {
LOG(ERROR) << "Parameter[sub_channel] is NULL";
Expand All @@ -185,6 +191,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag
return -1;
}
sub_chan->chan = sub_channel;
sub_chan->ownership = ownership;
SocketId sock_id;
SocketOptions options;
options.user = sub_chan;
Expand Down Expand Up @@ -215,7 +222,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag
return -1;
}
// The health-check-related reference has been held on created.
_chan_map[sub_channel]= ptr.get();
_chan_map[sub_channel] = ptr.get();
if (handle) {
handle->id = sock_id;
handle->tag = tag;
Expand Down Expand Up @@ -531,21 +538,17 @@ bool SelectiveChannel::initialized() const {
return _chan._lb != NULL;
}

int SelectiveChannel::AddChannel(ChannelBase* sub_channel,
ChannelHandle* handle) {
return AddChannel(sub_channel, "", handle);
}

int SelectiveChannel::AddChannel(ChannelBase* sub_channel,
const std::string& tag,
ChannelOwnership ownership,
ChannelHandle* handle) {
schan::ChannelBalancer* lb =
static_cast<schan::ChannelBalancer*>(_chan._lb.get());
if (lb == NULL) {
LOG(ERROR) << "You must call Init() to initialize a SelectiveChannel";
return -1;
}
return lb->AddChannel(sub_channel, tag, handle);
return lb->AddChannel(sub_channel, tag, ownership, handle);
}

void SelectiveChannel::RemoveAndDestroyChannel(const ChannelHandle& handle) {
Expand Down
15 changes: 13 additions & 2 deletions src/brpc/selective_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,19 @@ class SelectiveChannel : public ChannelBase/*non-copyable*/ {
// On success, handle is set with the key for removal.
// NOTE: Different from pchan, schan can add channels at any time.
// Returns 0 on success, -1 otherwise.
int AddChannel(ChannelBase* sub_channel, ChannelHandle* handle);
int AddChannel(ChannelBase* sub_channel, const std::string& tag, ChannelHandle* handle);
int AddChannel(ChannelBase* sub_channel, ChannelHandle* handle) {
return AddChannel(sub_channel, "", OWNS_CHANNEL, handle);
}
int AddChannel(ChannelBase* sub_channel, const std::string& tag,
ChannelHandle* handle) {
return AddChannel(sub_channel, tag, OWNS_CHANNEL, handle);
}
int AddChannel(ChannelBase* sub_channel, ChannelOwnership ownership,
ChannelHandle* handle) {
return AddChannel(sub_channel, "", ownership, handle);
}
int AddChannel(ChannelBase* sub_channel, const std::string& tag,
ChannelOwnership ownership, ChannelHandle* handle);

// Remove and destroy the sub_channel associated with `handle'.
void RemoveAndDestroyChannel(const ChannelHandle& handle);
Expand Down
Loading