Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 84 additions & 18 deletions src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,18 @@ static void HandleBackupRequest(void* arg) {
bthread_id_error(correlation_id, EBACKUPREQUEST);
}

void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller_base,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done) {
template <bool is_pb>
void Channel::CallMethodInternal(const typename std::conditional<is_pb,
google::protobuf::MethodDescriptor,
brpc::flatbuffers::MethodDescriptor>::type* method,
google::protobuf::RpcController* controller_base,
const typename std::conditional<is_pb,
google::protobuf::Message,
brpc::flatbuffers::Message>::type* request,
typename std::conditional<is_pb,
google::protobuf::Message,
brpc::flatbuffers::Message>::type* response,
google::protobuf::Closure* done) {
const int64_t start_send_real_us = butil::gettimeofday_us();
Controller* cntl = static_cast<Controller*>(controller_base);
cntl->OnRPCBegin(start_send_real_us);
Expand Down Expand Up @@ -492,22 +499,36 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,

if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) {
const int64_t start_send_us = butil::cpuwide_time_us();
std::string method_name;
const std::string* method_name = NULL;
if (_get_method_name) {
method_name = butil::EnsureString(_get_method_name(method, cntl));
if (is_pb) {
auto pb_method = reinterpret_cast<const google::protobuf::MethodDescriptor*>(method);
method_name = &_get_method_name(pb_method, cntl);
} else {
// FlatBuffers doesn't support _get_method_name yet
method_name = NULL;
}
} else if (method) {
method_name = butil::EnsureString(method->full_name());
if (is_pb) {
auto pb_method = reinterpret_cast<const google::protobuf::MethodDescriptor*>(method);
method_name = &pb_method->full_name();
} else {
auto fb_method = reinterpret_cast<const brpc::flatbuffers::MethodDescriptor*>(method);
method_name = &fb_method->full_name();
}
} else {
const static std::string NULL_METHOD_STR = "null-method";
method_name = NULL_METHOD_STR;
method_name = &NULL_METHOD_STR;
}
if (method_name) {
Span* span = Span::CreateClientSpan(
*method_name, start_send_real_us - start_send_us);
span->set_log_id(cntl->log_id());
span->set_base_cid(correlation_id);
span->set_protocol(_options.protocol);
span->set_start_send_us(start_send_us);
cntl->_span = span;
}
Span* span = Span::CreateClientSpan(
method_name, start_send_real_us - start_send_us);
span->set_log_id(cntl->log_id());
span->set_base_cid(correlation_id);
span->set_protocol(_options.protocol);
span->set_start_send_us(start_send_us);
cntl->_span = span;
}
// Override some options if they haven't been set by Controller
if (cntl->timeout_ms() == UNSET_MAGIC_NUM) {
Expand All @@ -525,11 +546,20 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) {
cntl->set_connection_type(_options.connection_type);
}
cntl->_response = response;

cntl->_done = done;
cntl->_pack_request = _pack_request;
cntl->_method = method;
cntl->_auth = _options.auth;
// Use reinterpret_cast to avoid template instantiation errors
// The actual type is guaranteed by the is_pb parameter
if (is_pb) {
cntl->_method = reinterpret_cast<const google::protobuf::MethodDescriptor*>(method);
cntl->_response = reinterpret_cast<google::protobuf::Message*>(response);
} else {
cntl->_fb_method = reinterpret_cast<const brpc::flatbuffers::MethodDescriptor*>(method);
cntl->_fb_response = reinterpret_cast<brpc::flatbuffers::Message*>(response);
cntl->set_use_flatbuffer();
}

if (SingleServer()) {
cntl->_single_server_id = _server_id;
Expand Down Expand Up @@ -615,6 +645,22 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
}
}

void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller_base,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done) {
CallMethodInternal<true>(method, controller_base, request, response, done);
}

void Channel::FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method,
google::protobuf::RpcController* controller_base,
const brpc::flatbuffers::Message* request,
brpc::flatbuffers::Message* response,
google::protobuf::Closure* done) {
CallMethodInternal<false>(method, controller_base, request, response, done);
}

void Channel::Describe(std::ostream& os, const DescribeOptions& opt) const {
os << "Channel[";
if (SingleServer()) {
Expand Down Expand Up @@ -644,4 +690,24 @@ int Channel::CheckHealth() {
}
}

// CallMethodInternal instance for pb and fb
template
void Channel::CallMethodInternal<true>(
const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller_base,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done
);

// CallMethodInternal instance for pb and fb
template
void Channel::CallMethodInternal<false>(
const brpc::flatbuffers::MethodDescriptor* method,
google::protobuf::RpcController* controller_base,
const brpc::flatbuffers::Message* request,
brpc::flatbuffers::Message* response,
google::protobuf::Closure* done
);

} // namespace brpc
22 changes: 21 additions & 1 deletion src/brpc/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "brpc/backup_request_policy.h"
#include "brpc/naming_service_filter.h"
#include "brpc/health_check_option.h"
#include "brpc/details/flatbuffers_impl.h" // flatbuffers

namespace brpc {

Expand Down Expand Up @@ -163,7 +164,8 @@ struct ChannelOptions {
// channel.Init("bns://rdev.matrix.all", "rr", NULL/*default options*/);
// MyService_Stub stub(&channel);
// stub.MyMethod(&controller, &request, &response, NULL);
class Channel : public ChannelBase {
class Channel : public ChannelBase,
public brpc::flatbuffers::RpcChannel {
friend class Controller;
friend class SelectiveChannel;
public:
Expand Down Expand Up @@ -212,6 +214,11 @@ friend class SelectiveChannel;
google::protobuf::Message* response,
google::protobuf::Closure* done);

void FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method,
google::protobuf::RpcController* controller_base,
const brpc::flatbuffers::Message* request,
brpc::flatbuffers::Message* response,
google::protobuf::Closure* done);
// Get current options.
const ChannelOptions& options() const { return _options; }

Expand Down Expand Up @@ -239,6 +246,19 @@ friend class SelectiveChannel;
const ChannelOptions* options,
int raw_port = -1);

template <bool is_pb>
inline void CallMethodInternal(const typename std::conditional<is_pb,
google::protobuf::MethodDescriptor,
brpc::flatbuffers::MethodDescriptor>::type* method,
google::protobuf::RpcController* controller_base,
const typename std::conditional<is_pb,
google::protobuf::Message,
brpc::flatbuffers::Message>::type* request,
typename std::conditional<is_pb,
google::protobuf::Message,
brpc::flatbuffers::Message>::type* response,
google::protobuf::Closure* done);

std::string _service_name;
std::string _scheme;
butil::EndPoint _server_address;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/channel_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "butil/logging.h"
#include <google/protobuf/service.h> // google::protobuf::RpcChannel
#include "brpc/describable.h"
#include "brpc/details/flatbuffers_common.h"

// To brpc developers: This is a header included by user, don't depend
// on internal structures, use opaque pointers instead.
Expand Down
5 changes: 4 additions & 1 deletion src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ void Controller::ResetPods() {
_accessed = NULL;
_pack_request = NULL;
_method = NULL;
_fb_method = NULL;
_auth = NULL;
_idl_names = idl_single_req_single_res;
_idl_result = IDL_VOID_RESULT;
Expand Down Expand Up @@ -1200,7 +1201,9 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// Make request
butil::IOBuf packet;
SocketMessage* user_packet = NULL;
_pack_request(&packet, &user_packet, cid.value, _method, this,
const void *method_desc = is_use_flatbuffer()? (const void*)_fb_method :
(const void*)_method;
_pack_request(&packet, &user_packet, cid.value, method_desc, this,
_request_buf, using_auth);
// TODO: PackRequest may accept SocketMessagePtr<>?
SocketMessagePtr<> user_packet_guard(user_packet);
Expand Down
10 changes: 10 additions & 0 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "brpc/grpc.h"
#include "brpc/kvmap.h"
#include "brpc/rpc_dump.h"
#include "brpc/details/flatbuffers_common.h"

// EAUTH is defined in MAC
#ifndef EAUTH
Expand Down Expand Up @@ -151,6 +152,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20);
static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21);
static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22);
static const uint32_t FLAGS_USE_FLATBUFFER = (1 << 23);

public:
struct Inheritable {
Expand Down Expand Up @@ -220,6 +222,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// Response of the RPC call (passed to CallMethod)
google::protobuf::Message* response() const { return _response; }

brpc::flatbuffers::Message* fb_response() const { return _fb_response; }

// An identifier to send to server along with request. This is widely used
// throughout baidu's servers to tag a searching session (a series of
// queries following the topology of servers) with a same log_id.
Expand Down Expand Up @@ -292,6 +296,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// Get the called method. May-be NULL for non-pb services.
const google::protobuf::MethodDescriptor* method() const { return _method; }

const brpc::flatbuffers::MethodDescriptor* fb_method() const { return _fb_method; }
// Get the controllers for accessing sub channels in combo channels.
// Ordinary channel:
// sub_count() is 0 and sub() is always NULL.
Expand Down Expand Up @@ -649,6 +654,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// the received time of RPC is not recorded in the controller.
int64_t get_rpc_received_us() const { return _rpc_received_us; }

void set_use_flatbuffer() { add_flag(FLAGS_USE_FLATBUFFER); }
bool is_use_flatbuffer() const { return has_flag(FLAGS_USE_FLATBUFFER); }

private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
Expand Down Expand Up @@ -860,6 +868,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
Inheritable _inheritable;
int _pchan_sub_count;
google::protobuf::Message* _response;
brpc::flatbuffers::Message* _fb_response;
google::protobuf::Closure* _done;
RPCSender* _sender;
uint64_t _request_code;
Expand All @@ -878,6 +887,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// Fields will be used when making requests
Protocol::PackRequest _pack_request;
const google::protobuf::MethodDescriptor* _method;
const brpc::flatbuffers::MethodDescriptor* _fb_method;
const Authenticator* _auth;
butil::IOBuf _request_buf;
IdlNames _idl_names;
Expand Down
3 changes: 3 additions & 0 deletions src/brpc/details/controller_private_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class ControllerPrivateAccessor {
void set_method(const google::protobuf::MethodDescriptor* method)
{ _cntl->_method = method; }

void set_fb_method(const brpc::flatbuffers::MethodDescriptor* method)
{ _cntl->_fb_method = method; }

void set_readable_progressive_attachment(ReadableProgressiveAttachment* s)
{ _cntl->_rpa.reset(s); }

Expand Down
5 changes: 5 additions & 0 deletions src/brpc/details/server_private_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ class ServerPrivateAccessor {
return _server->FindServicePropertyByName(name);
}

const Server::FlatBuffersMethodProperty* FindFlatBufferMethodPropertyByIndex(
uint32_t server_index, int method_index) const {
return _server->FindFlatBufferMethodPropertyByIndex(server_index, method_index);
}

const Server::ServiceProperty*
FindServicePropertyAdaptively(const butil::StringPiece& service_name) const {
if (service_name.find('.') == butil::StringPiece::npos) {
Expand Down
10 changes: 10 additions & 0 deletions src/brpc/global.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
// Protocols
#include "brpc/protocol.h"
#include "brpc/policy/baidu_rpc_protocol.h"
#include "brpc/policy/flatbuffers_protocol.h"
#include "brpc/policy/http_rpc_protocol.h"
#include "brpc/policy/http2_rpc_protocol.h"
#include "brpc/policy/hulu_pbrpc_protocol.h"
Expand Down Expand Up @@ -423,6 +424,15 @@ static void GlobalInitializeOrDieImpl() {
exit(1);
}

Protocol fb_protocol = { ParseFlatBuffersMessage,
SerializeFlatBuffersRequest, PackFlatBuffersRequest,
ProcessFlatBuffersRequest, ProcessFlatBuffersResponse,
NULL, NULL, NULL,
CONNECTION_TYPE_SINGLE, "fb_rpc" };
if (RegisterProtocol(PROTOCOL_FLATBUFFERS_RPC, fb_protocol) != 0) {
exit(1);
}

Protocol streaming_protocol = { ParseStreamingMessage,
NULL, NULL, ProcessStreamingMessage,
ProcessStreamingMessage,
Expand Down
1 change: 1 addition & 0 deletions src/brpc/options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ enum ProtocolType {
PROTOCOL_ESP = 25; // Client side only
PROTOCOL_H2 = 26;
PROTOCOL_COUCHBASE = 27;
PROTOCOL_FLATBUFFERS_RPC = 28;
}

enum CompressType {
Expand Down
7 changes: 5 additions & 2 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,8 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
}

void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl,
const google::protobuf::Message* request) {
const void* request_obj) {
const google::protobuf::Message* request = static_cast<const google::protobuf::Message*>(request_obj);
// Check sanity of request.
if (NULL == request) {
return cntl->SetFailed(EREQUEST, "`request' is NULL");
Expand Down Expand Up @@ -1045,10 +1046,12 @@ void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl,
void PackRpcRequest(butil::IOBuf* req_buf,
SocketMessage**,
uint64_t correlation_id,
const google::protobuf::MethodDescriptor* method,
const void* method_descriptor,
Controller* cntl,
const butil::IOBuf& request_body,
const Authenticator* auth) {
const google::protobuf::MethodDescriptor* method =
static_cast<const google::protobuf::MethodDescriptor*>(method_descriptor);
RpcMeta meta;
if (auth && auth->GenerateCredential(
meta.mutable_authentication_data()) != 0) {
Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/baidu_rpc_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ bool VerifyRpcRequest(const InputMessageBase* msg);

// Serialize `request' into `buf'.
void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl,
const google::protobuf::Message* request);
const void* request_obj);

// Pack `request' to `method' into `buf'.
void PackRpcRequest(butil::IOBuf* buf,
SocketMessage**,
uint64_t correlation_id,
const google::protobuf::MethodDescriptor* method,
const void* method_descriptor,
Controller* controller,
const butil::IOBuf& request,
const Authenticator* auth);
Expand Down
5 changes: 3 additions & 2 deletions src/brpc/policy/couchbase_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ void ProcessCouchbaseResponse(InputMessageBase* msg_base) {
}

void SerializeCouchbaseRequest(butil::IOBuf* buf, Controller* cntl,
const google::protobuf::Message* request) {
const void* request_obj) {
const google::protobuf::Message* request = static_cast<const google::protobuf::Message*>(request_obj);
if (request == NULL) {
return cntl->SetFailed(EREQUEST, "request is NULL");
}
Expand All @@ -208,7 +209,7 @@ void SerializeCouchbaseRequest(butil::IOBuf* buf, Controller* cntl,

void PackCouchbaseRequest(butil::IOBuf* buf, SocketMessage**,
uint64_t /*correlation_id*/,
const google::protobuf::MethodDescriptor*,
const void*,
Controller* cntl, const butil::IOBuf& request,
const Authenticator* auth) {
if (auth) {
Expand Down
Loading
Loading