diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 0fd43d7c9c..287ef0099e 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -430,11 +430,18 @@ static void HandleBackupRequest(void* arg) { bthread_id_error(correlation_id, EBACKUPREQUEST); } -void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, - google::protobuf::RpcController* controller_base, - const google::protobuf::Message* request, - google::protobuf::Message* response, - google::protobuf::Closure* done) { +template +void Channel::CallMethodInternal(const typename std::conditional::type* method, + google::protobuf::RpcController* controller_base, + const typename std::conditional::type* request, + typename std::conditional::type* response, + google::protobuf::Closure* done) { const int64_t start_send_real_us = butil::gettimeofday_us(); Controller* cntl = static_cast(controller_base); cntl->OnRPCBegin(start_send_real_us); @@ -492,22 +499,36 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) { const int64_t start_send_us = butil::cpuwide_time_us(); - std::string method_name; + const std::string* method_name = NULL; if (_get_method_name) { - method_name = butil::EnsureString(_get_method_name(method, cntl)); + if (is_pb) { + auto pb_method = reinterpret_cast(method); + method_name = &_get_method_name(pb_method, cntl); + } else { + // FlatBuffers doesn't support _get_method_name yet + method_name = NULL; + } } else if (method) { - method_name = butil::EnsureString(method->full_name()); + if (is_pb) { + auto pb_method = reinterpret_cast(method); + method_name = &pb_method->full_name(); + } else { + auto fb_method = reinterpret_cast(method); + method_name = &fb_method->full_name(); + } } else { const static std::string NULL_METHOD_STR = "null-method"; - method_name = NULL_METHOD_STR; + method_name = &NULL_METHOD_STR; + } + if (method_name) { + Span* span = Span::CreateClientSpan( + *method_name, start_send_real_us - start_send_us); + span->set_log_id(cntl->log_id()); + span->set_base_cid(correlation_id); + span->set_protocol(_options.protocol); + span->set_start_send_us(start_send_us); + cntl->_span = span; } - Span* span = Span::CreateClientSpan( - method_name, start_send_real_us - start_send_us); - span->set_log_id(cntl->log_id()); - span->set_base_cid(correlation_id); - span->set_protocol(_options.protocol); - span->set_start_send_us(start_send_us); - cntl->_span = span; } // Override some options if they haven't been set by Controller if (cntl->timeout_ms() == UNSET_MAGIC_NUM) { @@ -525,11 +546,20 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) { cntl->set_connection_type(_options.connection_type); } - cntl->_response = response; + cntl->_done = done; cntl->_pack_request = _pack_request; - cntl->_method = method; cntl->_auth = _options.auth; + // Use reinterpret_cast to avoid template instantiation errors + // The actual type is guaranteed by the is_pb parameter + if (is_pb) { + cntl->_method = reinterpret_cast(method); + cntl->_response = reinterpret_cast(response); + } else { + cntl->_fb_method = reinterpret_cast(method); + cntl->_fb_response = reinterpret_cast(response); + cntl->set_use_flatbuffer(); + } if (SingleServer()) { cntl->_single_server_id = _server_id; @@ -615,6 +645,22 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, } } +void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, + google::protobuf::RpcController* controller_base, + const google::protobuf::Message* request, + google::protobuf::Message* response, + google::protobuf::Closure* done) { + CallMethodInternal(method, controller_base, request, response, done); +} + +void Channel::FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method, + google::protobuf::RpcController* controller_base, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done) { + CallMethodInternal(method, controller_base, request, response, done); +} + void Channel::Describe(std::ostream& os, const DescribeOptions& opt) const { os << "Channel["; if (SingleServer()) { @@ -644,4 +690,24 @@ int Channel::CheckHealth() { } } +// CallMethodInternal instance for pb and fb +template +void Channel::CallMethodInternal( + const google::protobuf::MethodDescriptor* method, + google::protobuf::RpcController* controller_base, + const google::protobuf::Message* request, + google::protobuf::Message* response, + google::protobuf::Closure* done +); + +// CallMethodInternal instance for pb and fb +template +void Channel::CallMethodInternal( + const brpc::flatbuffers::MethodDescriptor* method, + google::protobuf::RpcController* controller_base, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done +); + } // namespace brpc diff --git a/src/brpc/channel.h b/src/brpc/channel.h index c970209b3a..927d6a70ee 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -37,6 +37,7 @@ #include "brpc/backup_request_policy.h" #include "brpc/naming_service_filter.h" #include "brpc/health_check_option.h" +#include "brpc/details/flatbuffers_impl.h" // flatbuffers namespace brpc { @@ -163,7 +164,8 @@ struct ChannelOptions { // channel.Init("bns://rdev.matrix.all", "rr", NULL/*default options*/); // MyService_Stub stub(&channel); // stub.MyMethod(&controller, &request, &response, NULL); -class Channel : public ChannelBase { +class Channel : public ChannelBase, + public brpc::flatbuffers::RpcChannel { friend class Controller; friend class SelectiveChannel; public: @@ -212,6 +214,11 @@ friend class SelectiveChannel; google::protobuf::Message* response, google::protobuf::Closure* done); + void FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method, + google::protobuf::RpcController* controller_base, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done); // Get current options. const ChannelOptions& options() const { return _options; } @@ -239,6 +246,19 @@ friend class SelectiveChannel; const ChannelOptions* options, int raw_port = -1); + template + inline void CallMethodInternal(const typename std::conditional::type* method, + google::protobuf::RpcController* controller_base, + const typename std::conditional::type* request, + typename std::conditional::type* response, + google::protobuf::Closure* done); + std::string _service_name; std::string _scheme; butil::EndPoint _server_address; diff --git a/src/brpc/channel_base.h b/src/brpc/channel_base.h index ed6ff24e40..3b5a13f8aa 100644 --- a/src/brpc/channel_base.h +++ b/src/brpc/channel_base.h @@ -24,6 +24,7 @@ #include "butil/logging.h" #include // google::protobuf::RpcChannel #include "brpc/describable.h" +#include "brpc/details/flatbuffers_common.h" // To brpc developers: This is a header included by user, don't depend // on internal structures, use opaque pointers instead. diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index d7b511dbd4..3ca87c6c90 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -285,6 +285,7 @@ void Controller::ResetPods() { _accessed = NULL; _pack_request = NULL; _method = NULL; + _fb_method = NULL; _auth = NULL; _idl_names = idl_single_req_single_res; _idl_result = IDL_VOID_RESULT; @@ -1200,7 +1201,9 @@ void Controller::IssueRPC(int64_t start_realtime_us) { // Make request butil::IOBuf packet; SocketMessage* user_packet = NULL; - _pack_request(&packet, &user_packet, cid.value, _method, this, + const void *method_desc = is_use_flatbuffer()? (const void*)_fb_method : + (const void*)_method; + _pack_request(&packet, &user_packet, cid.value, method_desc, this, _request_buf, using_auth); // TODO: PackRequest may accept SocketMessagePtr<>? SocketMessagePtr<> user_packet_guard(user_packet); diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 69d859ea8f..2065eb0114 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -47,6 +47,7 @@ #include "brpc/grpc.h" #include "brpc/kvmap.h" #include "brpc/rpc_dump.h" +#include "brpc/details/flatbuffers_common.h" // EAUTH is defined in MAC #ifndef EAUTH @@ -151,6 +152,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20); static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21); static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22); + static const uint32_t FLAGS_USE_FLATBUFFER = (1 << 23); public: struct Inheritable { @@ -220,6 +222,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Response of the RPC call (passed to CallMethod) google::protobuf::Message* response() const { return _response; } + brpc::flatbuffers::Message* fb_response() const { return _fb_response; } + // An identifier to send to server along with request. This is widely used // throughout baidu's servers to tag a searching session (a series of // queries following the topology of servers) with a same log_id. @@ -292,6 +296,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Get the called method. May-be NULL for non-pb services. const google::protobuf::MethodDescriptor* method() const { return _method; } + const brpc::flatbuffers::MethodDescriptor* fb_method() const { return _fb_method; } // Get the controllers for accessing sub channels in combo channels. // Ordinary channel: // sub_count() is 0 and sub() is always NULL. @@ -649,6 +654,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // the received time of RPC is not recorded in the controller. int64_t get_rpc_received_us() const { return _rpc_received_us; } + void set_use_flatbuffer() { add_flag(FLAGS_USE_FLATBUFFER); } + bool is_use_flatbuffer() const { return has_flag(FLAGS_USE_FLATBUFFER); } + private: struct CompletionInfo { CallId id; // call_id of the corresponding request @@ -860,6 +868,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); Inheritable _inheritable; int _pchan_sub_count; google::protobuf::Message* _response; + brpc::flatbuffers::Message* _fb_response; google::protobuf::Closure* _done; RPCSender* _sender; uint64_t _request_code; @@ -878,6 +887,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Fields will be used when making requests Protocol::PackRequest _pack_request; const google::protobuf::MethodDescriptor* _method; + const brpc::flatbuffers::MethodDescriptor* _fb_method; const Authenticator* _auth; butil::IOBuf _request_buf; IdlNames _idl_names; diff --git a/src/brpc/details/controller_private_accessor.h b/src/brpc/details/controller_private_accessor.h index 1a9d7062af..e25b6154ba 100644 --- a/src/brpc/details/controller_private_accessor.h +++ b/src/brpc/details/controller_private_accessor.h @@ -125,6 +125,9 @@ class ControllerPrivateAccessor { void set_method(const google::protobuf::MethodDescriptor* method) { _cntl->_method = method; } + void set_fb_method(const brpc::flatbuffers::MethodDescriptor* method) + { _cntl->_fb_method = method; } + void set_readable_progressive_attachment(ReadableProgressiveAttachment* s) { _cntl->_rpa.reset(s); } diff --git a/src/brpc/details/server_private_accessor.h b/src/brpc/details/server_private_accessor.h index aacf283564..61875615cc 100644 --- a/src/brpc/details/server_private_accessor.h +++ b/src/brpc/details/server_private_accessor.h @@ -86,6 +86,11 @@ class ServerPrivateAccessor { return _server->FindServicePropertyByName(name); } + const Server::FlatBuffersMethodProperty* FindFlatBufferMethodPropertyByIndex( + uint32_t server_index, int method_index) const { + return _server->FindFlatBufferMethodPropertyByIndex(server_index, method_index); + } + const Server::ServiceProperty* FindServicePropertyAdaptively(const butil::StringPiece& service_name) const { if (service_name.find('.') == butil::StringPiece::npos) { diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index c561d927d7..520e112192 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -67,6 +67,7 @@ // Protocols #include "brpc/protocol.h" #include "brpc/policy/baidu_rpc_protocol.h" +#include "brpc/policy/flatbuffers_protocol.h" #include "brpc/policy/http_rpc_protocol.h" #include "brpc/policy/http2_rpc_protocol.h" #include "brpc/policy/hulu_pbrpc_protocol.h" @@ -423,6 +424,15 @@ static void GlobalInitializeOrDieImpl() { exit(1); } + Protocol fb_protocol = { ParseFlatBuffersMessage, + SerializeFlatBuffersRequest, PackFlatBuffersRequest, + ProcessFlatBuffersRequest, ProcessFlatBuffersResponse, + NULL, NULL, NULL, + CONNECTION_TYPE_SINGLE, "fb_rpc" }; + if (RegisterProtocol(PROTOCOL_FLATBUFFERS_RPC, fb_protocol) != 0) { + exit(1); + } + Protocol streaming_protocol = { ParseStreamingMessage, NULL, NULL, ProcessStreamingMessage, ProcessStreamingMessage, diff --git a/src/brpc/options.proto b/src/brpc/options.proto index 4ad97aa828..2cd9dcd349 100644 --- a/src/brpc/options.proto +++ b/src/brpc/options.proto @@ -65,6 +65,7 @@ enum ProtocolType { PROTOCOL_ESP = 25; // Client side only PROTOCOL_H2 = 26; PROTOCOL_COUCHBASE = 27; + PROTOCOL_FLATBUFFERS_RPC = 28; } enum CompressType { diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 0dba01624a..c82e23ada1 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -1013,7 +1013,8 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { } void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl, - const google::protobuf::Message* request) { + const void* request_obj) { + const google::protobuf::Message* request = static_cast(request_obj); // Check sanity of request. if (NULL == request) { return cntl->SetFailed(EREQUEST, "`request' is NULL"); @@ -1045,10 +1046,12 @@ void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl, void PackRpcRequest(butil::IOBuf* req_buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method_descriptor, Controller* cntl, const butil::IOBuf& request_body, const Authenticator* auth) { + const google::protobuf::MethodDescriptor* method = + static_cast(method_descriptor); RpcMeta meta; if (auth && auth->GenerateCredential( meta.mutable_authentication_data()) != 0) { diff --git a/src/brpc/policy/baidu_rpc_protocol.h b/src/brpc/policy/baidu_rpc_protocol.h index 77ecc780a2..8850832183 100644 --- a/src/brpc/policy/baidu_rpc_protocol.h +++ b/src/brpc/policy/baidu_rpc_protocol.h @@ -39,13 +39,13 @@ bool VerifyRpcRequest(const InputMessageBase* msg); // Serialize `request' into `buf'. void SerializeRpcRequest(butil::IOBuf* request_buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request_obj); // Pack `request' to `method' into `buf'. void PackRpcRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method_descriptor, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/policy/couchbase_protocol.cpp b/src/brpc/policy/couchbase_protocol.cpp index a014581ed5..2c5b8eb14b 100644 --- a/src/brpc/policy/couchbase_protocol.cpp +++ b/src/brpc/policy/couchbase_protocol.cpp @@ -191,7 +191,8 @@ void ProcessCouchbaseResponse(InputMessageBase* msg_base) { } void SerializeCouchbaseRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request) { + const void* request_obj) { + const google::protobuf::Message* request = static_cast(request_obj); if (request == NULL) { return cntl->SetFailed(EREQUEST, "request is NULL"); } @@ -208,7 +209,7 @@ void SerializeCouchbaseRequest(butil::IOBuf* buf, Controller* cntl, void PackCouchbaseRequest(butil::IOBuf* buf, SocketMessage**, uint64_t /*correlation_id*/, - const google::protobuf::MethodDescriptor*, + const void*, Controller* cntl, const butil::IOBuf& request, const Authenticator* auth) { if (auth) { diff --git a/src/brpc/policy/couchbase_protocol.h b/src/brpc/policy/couchbase_protocol.h index 15367def0b..7aa07c144e 100644 --- a/src/brpc/policy/couchbase_protocol.h +++ b/src/brpc/policy/couchbase_protocol.h @@ -151,12 +151,12 @@ void ProcessCouchbaseResponse(InputMessageBase* msg); // Serialize a couchbase request. void SerializeCouchbaseRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request); // Pack `request' to `method' into `buf'. void PackCouchbaseRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/policy/esp_protocol.cpp b/src/brpc/policy/esp_protocol.cpp index 5925796b88..18fd3ddf17 100644 --- a/src/brpc/policy/esp_protocol.cpp +++ b/src/brpc/policy/esp_protocol.cpp @@ -65,8 +65,8 @@ ParseResult ParseEspMessage( void SerializeEspRequest( butil::IOBuf* request_buf, Controller* cntl, - const google::protobuf::Message* req_base) { - + const void* req_obj) { + const google::protobuf::Message* req_base = static_cast(req_obj); if (req_base == NULL) { return cntl->SetFailed(EREQUEST, "request is NULL"); } @@ -89,7 +89,7 @@ void SerializeEspRequest( void PackEspRequest(butil::IOBuf* packet_buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor*, + const void*, Controller* cntl, const butil::IOBuf& request, const Authenticator* auth) { diff --git a/src/brpc/policy/esp_protocol.h b/src/brpc/policy/esp_protocol.h index 7fb58c35b3..904939a14d 100644 --- a/src/brpc/policy/esp_protocol.h +++ b/src/brpc/policy/esp_protocol.h @@ -36,12 +36,12 @@ ParseResult ParseEspMessage( void SerializeEspRequest( butil::IOBuf* request_buf, Controller* controller, - const google::protobuf::Message* request); + const void* request_obj); void PackEspRequest(butil::IOBuf* packet_buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor*, + const void*, Controller* controller, const butil::IOBuf&, const Authenticator*); diff --git a/src/brpc/policy/flatbuffers_protocol.cpp b/src/brpc/policy/flatbuffers_protocol.cpp new file mode 100644 index 0000000000..824a1e2141 --- /dev/null +++ b/src/brpc/policy/flatbuffers_protocol.cpp @@ -0,0 +1,472 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "butil/logging.h" // LOG() +#include "butil/iobuf.h" // butil::IOBuf +#include "butil/single_iobuf.h" // butil::SingleIOBuf +#include "butil/time.h" + +#include "butil/raw_pack.h" // RawPacker RawUnpacker + +#include "brpc/controller.h" // Controller +#include "brpc/socket.h" // Socket +#include "brpc/server.h" // Server +#include "brpc/stream_impl.h" +#include "brpc/rpc_dump.h" // SampledRequest +#include "brpc/policy/most_common_message.h" +#include "brpc/details/controller_private_accessor.h" +#include "brpc/details/server_private_accessor.h" +#include "brpc/policy/flatbuffers_protocol.h" + +namespace brpc { +namespace policy { + +struct FBRpcRequestMeta { + struct { + uint32_t service_index; + int32_t method_index; + } request; + int32_t message_size; + int32_t attachment_size; + int64_t correlation_id; +}__attribute__((packed)); + +struct FBRpcResponseMeta { + struct { + int32_t error_code; + } response; + int32_t message_size; + int32_t attachment_size; + int64_t correlation_id; +}__attribute__((packed)); + +struct FBRpcRequestHeader { + char header[12]; + struct FBRpcRequestMeta meta; +}__attribute__((packed)); + +struct FBRpcResponseHeader { + char header[12]; + struct FBRpcResponseMeta meta; +}__attribute__((packed)); + +bool inline ParseFbFromIOBuf(brpc::flatbuffers::Message* msg, size_t msg_size, const butil::IOBuf& buf) { + return brpc::flatbuffers::ParseFbFromIOBUF(msg, msg_size, buf); +} + +// Notes: +// 1. 12-byte header [BRPC][body_size][meta_size] +// 2. body_size and meta_size are in network byte order +// 3. Use service->service_index + method_index to specify the method to call +// 4. `attachment_size' is set iff request/response has attachment +// 5. Not supported: chunk_info + +// Pack header into `buf' + +static inline void PackFlatbuffersRpcHeader(char* rpc_header, int meta_size, int payload_size) { + // supress strict-aliasing warning. + uint32_t* dummy = (uint32_t*)rpc_header; + *dummy = *(uint32_t*)"BRPC"; + butil::RawPacker(rpc_header + 4) + .pack32(meta_size + payload_size) + .pack32(meta_size); +} + +static inline bool ParseMetaBufferFromIOBUF(butil::SingleIOBuf* dest, + const butil::IOBuf& source, uint32_t msg_size) { + return dest->assign(source, msg_size); +} + +ParseResult ParseFlatBuffersMessage(butil::IOBuf* source, Socket* socket, + bool /*read_eof*/, const void*) { + char header_buf[12]; + const size_t n = source->copy_to(header_buf, sizeof(header_buf)); + if (n >= 4) { + void* dummy = header_buf; + if (*(const uint32_t*)dummy != *(const uint32_t*)"BRPC") { + return MakeParseError(PARSE_ERROR_TRY_OTHERS); + } + } else { + if (memcmp(header_buf, "BRPC", n) != 0) { + return MakeParseError(PARSE_ERROR_TRY_OTHERS); + } + } + if (n < sizeof(header_buf)) { + return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + } + uint32_t body_size; + uint32_t meta_size; + butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size); + if (body_size > FLAGS_max_body_size) { + // We need this log to report the body_size to give users some clues + // which is not printed in InputMessenger. + LOG(ERROR) << "body_size=" << body_size << " from " + << socket->remote_side() << " is too large"; + return MakeParseError(PARSE_ERROR_TOO_BIG_DATA); + } else if (source->length() < sizeof(header_buf) + body_size) { + return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + } + if (meta_size > body_size) { + LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size=" + << body_size; + // Pop the message + source->pop_front(sizeof(header_buf) + body_size); + return MakeParseError(PARSE_ERROR_TRY_OTHERS); + } + source->pop_front(sizeof(header_buf)); + MostCommonMessage* msg = MostCommonMessage::Get(); + source->cutn(&msg->meta, meta_size); + source->cutn(&msg->payload, body_size - meta_size); + return MakeMessage(msg); +} + +static void SendFlatBuffersRpcResponse(int64_t correlation_id, + Controller* cntl, + brpc::flatbuffers::Message* req, + brpc::flatbuffers::Message* res, + const Server* server, + MethodStatus* method_status_raw, + int64_t received_us) { + ControllerPrivateAccessor accessor(cntl); + Socket* sock = accessor.get_sending_socket(); + ConcurrencyRemover concurrency_remover(method_status_raw, cntl, received_us); + std::unique_ptr recycle_cntl(cntl); + std::unique_ptr recycle_req(req); + std::unique_ptr recycle_res(res); + //ScopedRemoveConcurrency remove_concurrency_dummy(server, cntl); + if (cntl->IsCloseConnection()) { + sock->SetFailed(); + return; + } + bool append_body = false; + butil::IOBuf res_body; + // `res' can be NULL here, in which case we don't serialize it + // If user calls `SetFailed' on Controller, we don't serialize + // response either + struct FBRpcResponseHeader *rpc_header = NULL; + uint32_t reserve_size = sizeof(struct FBRpcResponseHeader); + + if (res != NULL && !cntl->Failed()) { + rpc_header = static_cast(res->reduce_meta_size_and_get_buf(sizeof(struct FBRpcResponseHeader))); + if (BAIDU_UNLIKELY(rpc_header == NULL)) { + cntl->SetFailed(ERESPONSE, "Fail to reduce meta size and get buf"); + } else { + if (!brpc::flatbuffers::SerializeFbToIOBUF(res, res_body)) { + cntl->SetFailed(ERESPONSE, "Fail to serialize response"); + } else { + append_body = true; + } + } + } + + // Don't use res->ByteSize() since it may be compressed + size_t res_size = 0; + size_t attached_size = 0; + size_t meta_size = sizeof(struct FBRpcResponseMeta); + if (append_body && rpc_header != NULL) { + res_size = res_body.length() - reserve_size; + attached_size = cntl->response_attachment().length(); + PackFlatbuffersRpcHeader(rpc_header->header, + meta_size, res_size + attached_size); + rpc_header->meta.message_size = res_size; + rpc_header->meta.attachment_size = attached_size; + rpc_header->meta.response.error_code = cntl->ErrorCode(); + rpc_header->meta.correlation_id = correlation_id; + if (attached_size > 0) { + res_body.append(cntl->response_attachment().movable()); + } + } else { // error response + struct FBRpcResponseHeader tmp_header; + tmp_header.meta.message_size = 0; + tmp_header.meta.attachment_size = 0; + tmp_header.meta.response.error_code = cntl->ErrorCode(); + tmp_header.meta.correlation_id = correlation_id; + PackFlatbuffersRpcHeader(tmp_header.header, + meta_size, 0); + res_body.clear(); + res_body.append((void const*) &tmp_header, + sizeof(struct FBRpcResponseHeader)); + } + Socket::WriteOptions wopt; + wopt.ignore_eovercrowded = true; + if (sock->Write(&res_body, &wopt) != 0) { + const int errcode = errno; + PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; + cntl->SetFailed(errcode, "Fail to write into %s", + sock->description().c_str()); + return; + } +} + +void ProcessFlatBuffersRequest(InputMessageBase* msg_base) { + DestroyingPtr msg(static_cast(msg_base)); + SocketUniquePtr socket_guard(msg->ReleaseSocket()); + Socket* socket = socket_guard.get(); + const Server* server = static_cast(msg_base->arg()); + ScopedNonServiceError non_service_error(server); + butil::SingleIOBuf meta_buf; + if (!ParseMetaBufferFromIOBUF(&meta_buf, + msg->meta, sizeof(struct FBRpcRequestMeta))) { + LOG(WARNING) << "Fail to parse RpcMeta from " << *socket; + socket->SetFailed(EREQUEST, "Fail to parse RpcMeta from %s", + socket->description().c_str()); + return; + } + const struct FBRpcRequestMeta* meta = + static_cast(meta_buf.get_begin()); + if (!meta) { + LOG(WARNING) << "RpcMeta from " << *socket << " is NULL"; + socket->SetFailed(EREQUEST, "Fail to parse RpcMeta from %s", + socket->description().c_str()); + return; + } + + std::unique_ptr cntl; + cntl.reset(new (std::nothrow) Controller); + if (NULL == cntl.get()) { + LOG(WARNING) << "Fail to new Controller"; + return; + } + + std::unique_ptr req; + std::unique_ptr res; + + ServerPrivateAccessor server_accessor(server); + + ControllerPrivateAccessor accessor(cntl.get()); + accessor.set_server(server) + .set_peer_id(socket->id()) + .set_remote_side(socket->remote_side()) + .set_local_side(socket->local_side()) + .set_request_protocol(PROTOCOL_FLATBUFFERS_RPC) + .move_in_server_receiving_sock(socket_guard); + MethodStatus* method_status = NULL; + do { + if (!server->IsRunning()) { + cntl->SetFailed(ELOGOFF, "Server is stopping"); + break; + } + + if (socket->is_overcrowded()) { + cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", + butil::endpoint2str(socket->remote_side()).c_str()); + break; + } + + if (!server_accessor.AddConcurrency(cntl.get())) { + cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", + server->options().max_concurrency); + break; + } + + const Server::FlatBuffersMethodProperty* mp = + server_accessor.FindFlatBufferMethodPropertyByIndex(meta->request.service_index, + meta->request.method_index); + if (NULL == mp) { + cntl->SetFailed(ENOMETHOD, "Fail to find method_index=%d service_index=%u ", + meta->request.method_index, + meta->request.service_index); + break; + } + // Switch to service-specific error. + non_service_error.release(); + if (mp->status) { + method_status = mp->status; + if (!method_status->OnRequested()) { + cntl->SetFailed(ELIMIT, "Reached %s's MaxConcurrency=%d", + mp->method->full_name().c_str(), + method_status->MaxConcurrency()); + break; + } + } + brpc::flatbuffers::Service* svc = mp->service; + const brpc::flatbuffers::MethodDescriptor* method = mp->method; + accessor.set_fb_method(method); + const int reqsize = static_cast(msg->payload.size()); + butil::IOBuf req_buf; + butil::IOBuf* req_buf_ptr = &msg->payload; + if (meta->attachment_size > 0) { + if (reqsize < meta->attachment_size) { + cntl->SetFailed(EREQUEST, + "attachment_size=%d is larger than request_size=%d", + meta->attachment_size, reqsize); + break; + } + int body_without_attachment_size = reqsize - meta->attachment_size; + msg->payload.cutn(&req_buf, body_without_attachment_size); + req_buf_ptr = &req_buf; + cntl->request_attachment().swap(msg->payload); + } + + req.reset(new brpc::flatbuffers::Message()); + if (!brpc::flatbuffers::ParseFbFromIOBUF(req.get(), meta->message_size, *req_buf_ptr)) { + cntl->SetFailed(EREQUEST, "Fail to parse request message, " + "request_size=%d", reqsize); + break; + } + res.reset(new brpc::flatbuffers::Message()); + // `socket' will be held until response has been sent + google::protobuf::Closure* done = ::brpc::NewCallback< + int64_t, Controller*, brpc::flatbuffers::Message*, + brpc::flatbuffers::Message*, const Server*, + MethodStatus*, int64_t>( + &SendFlatBuffersRpcResponse, meta->correlation_id, cntl.get(), + req.get(), res.get(), server, + method_status, msg->received_us()); + + // optional, just release resourse ASAP + msg.reset(); + req_buf.clear(); + //only used in polling thread + svc->FBCallMethod(method, cntl.release(), + req.release(), res.release(), done); + return; + } while (false); + // `cntl', `req' and `res' will be deleted inside `SendFlatBuffersRpcResponse' + // `socket' will be held until response has been sent + SendFlatBuffersRpcResponse(meta->correlation_id, cntl.release(), + req.release(), res.release(), server, + method_status, -1); +} + +void ProcessFlatBuffersResponse(InputMessageBase* msg_base) { + DestroyingPtr msg(static_cast(msg_base)); + butil::SingleIOBuf meta_buf; + if (!ParseMetaBufferFromIOBUF(&meta_buf, + msg->meta, sizeof(struct FBRpcResponseMeta))) { + LOG(WARNING) << "Fail to parse from response meta"; + return; + } + const struct FBRpcResponseMeta* meta = + static_cast(meta_buf.get_begin()); + if (!meta) { + LOG(WARNING) << "Fail to parse from response meta: meta is NULL"; + return; + } + + const bthread_id_t cid = { static_cast(meta->correlation_id) }; + Controller* cntl = NULL; + const int rc = bthread_id_lock(cid, (void**)&cntl); + if (rc != 0) { + LOG_IF(ERROR, rc != EINVAL && rc != EPERM) + << "Fail to lock correlation_id=" << cid << ": " << berror(rc); + return; + } + + ControllerPrivateAccessor accessor(cntl); + const int saved_error = cntl->ErrorCode(); + do { + if (meta->response.error_code != 0) { + // If error_code is unset, default is 0 = success. + cntl->SetFailed(meta->response.error_code, + "server response error"); + break; + } + // Parse response message if error code from meta is 0 + butil::IOBuf res_buf; + const int res_size = msg->payload.length(); + butil::IOBuf* res_buf_ptr = &msg->payload; + if (meta->attachment_size > 0) { + if (meta->attachment_size > res_size) { + cntl->SetFailed( + ERESPONSE, + "attachment_size=%d is larger than response_size=%d", + meta->attachment_size, res_size); + break; + } + int body_without_attachment_size = res_size - meta->attachment_size; + msg->payload.cutn(&res_buf, body_without_attachment_size); + res_buf_ptr = &res_buf; + cntl->response_attachment().swap(msg->payload); + } + + if (cntl->fb_response()) { + if (!brpc::flatbuffers::ParseFbFromIOBUF(cntl->fb_response(), + meta->message_size, *res_buf_ptr)) { + cntl->SetFailed( + ERESPONSE, "Fail to parse response message, " + " response_size=%d", res_size); + } + } // else silently ignore the response. + } while (0); + // Unlocks correlation_id inside. Revert controller's + // error code if it version check of `cid' fails + msg.reset(); // optional, just release resourse ASAP + accessor.OnResponse(cid, saved_error); +} + +void PackFlatBuffersRequest(butil::IOBuf* req_buf, + SocketMessage**, + uint64_t correlation_id, + const void* method_descriptor, + Controller* cntl, + const butil::IOBuf& request_body, + const Authenticator* auth) { + const brpc::flatbuffers::MethodDescriptor* method = + static_cast(method_descriptor); + struct FBRpcRequestHeader *rpc_header = NULL; + size_t req_size = request_body.length(); + rpc_header = (struct FBRpcRequestHeader*)const_cast(request_body.fetch1()); + if (BAIDU_UNLIKELY(rpc_header == NULL)) { + return cntl->SetFailed(ERESPONSE, "fail to get fb request rpc header"); + } + req_size -= sizeof(struct FBRpcRequestHeader); + + //ControllerPrivateAccessor accessor(cntl); + if (method) { + rpc_header->meta.request.service_index = method->service()->index(); + rpc_header->meta.request.method_index = method->index(); + } else { + return cntl->SetFailed(ENOMETHOD, "%s.method is NULL", __FUNCTION__); + } + + rpc_header->meta.correlation_id = correlation_id; + + size_t meta_size = sizeof(struct FBRpcRequestMeta); + rpc_header->meta.message_size = req_size; + const size_t attached_size = cntl->request_attachment().length(); + if (attached_size > 0) { + rpc_header->meta.attachment_size = attached_size; + } else { + rpc_header->meta.attachment_size = 0; + } + PackFlatbuffersRpcHeader(rpc_header->header, meta_size, req_size + attached_size); + + req_buf->append(request_body); + + if (attached_size > 0) { + req_buf->append(cntl->request_attachment()); + } +} + +void SerializeFlatBuffersRequest(butil::IOBuf* buf, + Controller* cntl, + const void* request_obj) { + brpc::flatbuffers::Message* request = (brpc::flatbuffers::Message*)const_cast(request_obj); + // Check sanity of request. + if (!request) { + return cntl->SetFailed(EREQUEST, "`request' is NULL"); + } + uint32_t reserve_size = sizeof(struct FBRpcRequestHeader); + request->reduce_meta_size_and_get_buf(reserve_size); + if (!brpc::flatbuffers::SerializeFbToIOBUF(request, *buf)) { + return cntl->SetFailed(EREQUEST, "Fail to serialize request"); + } +} + +} // namespace policy +} // namespace brpc \ No newline at end of file diff --git a/src/brpc/policy/flatbuffers_protocol.h b/src/brpc/policy/flatbuffers_protocol.h new file mode 100644 index 0000000000..99da18aa47 --- /dev/null +++ b/src/brpc/policy/flatbuffers_protocol.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_POLICY_FLATBUFFERS_PROTOCOL_H +#define BRPC_POLICY_FLATBUFFERS_PROTOCOL_H + +#include "brpc/protocol.h" +#include "brpc/details/flatbuffers_impl.h" + +namespace brpc { +namespace policy { + +// Parse binary format of flatbuffers-pbrpc. +ParseResult ParseFlatBuffersMessage(butil::IOBuf* source, Socket *socket, bool read_eof, const void *arg); + +// Actions to a (client) request in flatbuffers-pbrpc format. +void ProcessFlatBuffersRequest(InputMessageBase* msg_base); + +// Actions to a (server) response in flatbuffers-pbrpc format. +void ProcessFlatBuffersResponse(InputMessageBase* msg); + +// The serialize_request implementation used by flatbuffers protocol. +void SerializeFlatBuffersRequest(butil::IOBuf* buf, + Controller* cntl, + const void* request_obj); + +// Pack `request' to `method' into `buf'. +void PackFlatBuffersRequest(butil::IOBuf* buf, + SocketMessage**, + uint64_t correlation_id, + const void* method_descriptor, + Controller* controller, + const butil::IOBuf& request, + const Authenticator* auth); + +} // namespace policy +} // namespace brpc + +#endif // BRPC_POLICY_FLATBUFFERS_PROTOCOL_H \ No newline at end of file diff --git a/src/brpc/policy/http2_rpc_protocol.cpp b/src/brpc/policy/http2_rpc_protocol.cpp index e202d32bc2..25be9f04a2 100644 --- a/src/brpc/policy/http2_rpc_protocol.cpp +++ b/src/brpc/policy/http2_rpc_protocol.cpp @@ -1784,7 +1784,7 @@ void H2UnsentResponse::Print(std::ostream& os) const { void PackH2Request(butil::IOBuf*, SocketMessage** user_message, uint64_t correlation_id, - const google::protobuf::MethodDescriptor*, + const void*, Controller* cntl, const butil::IOBuf&, const Authenticator* auth) { diff --git a/src/brpc/policy/http2_rpc_protocol.h b/src/brpc/policy/http2_rpc_protocol.h index b4422ee057..4d56c4274a 100644 --- a/src/brpc/policy/http2_rpc_protocol.h +++ b/src/brpc/policy/http2_rpc_protocol.h @@ -132,8 +132,8 @@ inline H2Bvars* get_h2_bvars() { class H2UnsentRequest : public SocketMessage, public StreamUserData { friend void PackH2Request(butil::IOBuf*, SocketMessage**, - uint64_t, const google::protobuf::MethodDescriptor*, - Controller*, const butil::IOBuf&, const Authenticator*); + uint64_t, const void*,Controller*, + const butil::IOBuf&, const Authenticator*); public: static H2UnsentRequest* New(Controller* c); void Print(std::ostream& os) const; @@ -285,7 +285,7 @@ ParseResult ParseH2Message(butil::IOBuf *source, Socket *socket, void PackH2Request(butil::IOBuf* buf, SocketMessage** user_message_out, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index d0150a63fd..ff56307cb0 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -566,7 +566,8 @@ void ProcessHttpResponse(InputMessageBase* msg) { void SerializeHttpRequest(butil::IOBuf* /*not used*/, Controller* cntl, - const google::protobuf::Message* pbreq) { + const void* msg) { + const google::protobuf::Message* pbreq = static_cast(msg); HttpHeader& hreq = cntl->http_request(); const bool is_http2 = (cntl->request_protocol() == PROTOCOL_H2); bool is_grpc = false; @@ -735,7 +736,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, void PackHttpRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor*, + const void*, Controller* cntl, const butil::IOBuf& /*unused*/, const Authenticator* auth) { diff --git a/src/brpc/policy/http_rpc_protocol.h b/src/brpc/policy/http_rpc_protocol.h index bc8bd06593..4ff3e41d76 100644 --- a/src/brpc/policy/http_rpc_protocol.h +++ b/src/brpc/policy/http_rpc_protocol.h @@ -132,11 +132,11 @@ void ProcessHttpResponse(InputMessageBase* msg); bool VerifyHttpRequest(const InputMessageBase* msg); void SerializeHttpRequest(butil::IOBuf* request_buf, Controller* cntl, - const google::protobuf::Message* msg); + const void* msg); void PackHttpRequest(butil::IOBuf* buf, SocketMessage** user_message_out, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/policy/hulu_pbrpc_protocol.cpp b/src/brpc/policy/hulu_pbrpc_protocol.cpp index bd0c496027..8e0a5010f6 100644 --- a/src/brpc/policy/hulu_pbrpc_protocol.cpp +++ b/src/brpc/policy/hulu_pbrpc_protocol.cpp @@ -666,10 +666,11 @@ void ProcessHuluResponse(InputMessageBase* msg_base) { void PackHuluRequest(butil::IOBuf* req_buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method_obj, Controller* cntl, const butil::IOBuf& req_body, const Authenticator* auth) { + const google::protobuf::MethodDescriptor* method = static_cast(method_obj); HuluRpcRequestMeta meta; if (auth != NULL && auth->GenerateCredential( meta.mutable_credential_data()) != 0) { diff --git a/src/brpc/policy/hulu_pbrpc_protocol.h b/src/brpc/policy/hulu_pbrpc_protocol.h index d62f8a6def..e311601b83 100644 --- a/src/brpc/policy/hulu_pbrpc_protocol.h +++ b/src/brpc/policy/hulu_pbrpc_protocol.h @@ -41,7 +41,7 @@ bool VerifyHuluRequest(const InputMessageBase* msg); void PackHuluRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method_obj, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/policy/memcache_binary_protocol.cpp b/src/brpc/policy/memcache_binary_protocol.cpp index d4c39dfd33..a7ce0de4cb 100644 --- a/src/brpc/policy/memcache_binary_protocol.cpp +++ b/src/brpc/policy/memcache_binary_protocol.cpp @@ -193,7 +193,8 @@ void ProcessMemcacheResponse(InputMessageBase* msg_base) { void SerializeMemcacheRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request) { + const void* request_obj) { + const google::protobuf::Message* request = static_cast(request_obj); if (request == NULL) { return cntl->SetFailed(EREQUEST, "request is NULL"); } @@ -209,7 +210,7 @@ void SerializeMemcacheRequest(butil::IOBuf* buf, void PackMemcacheRequest(butil::IOBuf* buf, SocketMessage**, uint64_t /*correlation_id*/, - const google::protobuf::MethodDescriptor*, + const void*, Controller* cntl, const butil::IOBuf& request, const Authenticator* auth) { diff --git a/src/brpc/policy/memcache_binary_protocol.h b/src/brpc/policy/memcache_binary_protocol.h index f51b80073b..244e453889 100644 --- a/src/brpc/policy/memcache_binary_protocol.h +++ b/src/brpc/policy/memcache_binary_protocol.h @@ -35,13 +35,13 @@ void ProcessMemcacheResponse(InputMessageBase* msg); // Serialize a memcache request. void SerializeMemcacheRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request_obj); // Pack `request' to `method' into `buf'. void PackMemcacheRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/policy/nova_pbrpc_protocol.cpp b/src/brpc/policy/nova_pbrpc_protocol.cpp index 249e35c7a1..ba0a943851 100644 --- a/src/brpc/policy/nova_pbrpc_protocol.cpp +++ b/src/brpc/policy/nova_pbrpc_protocol.cpp @@ -152,7 +152,8 @@ void ProcessNovaResponse(InputMessageBase* msg_base) { } void SerializeNovaRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request) { + const void* request_obj) { + const google::protobuf::Message* request = static_cast(request_obj); CompressType type = cntl->request_compress_type(); if (type != COMPRESS_TYPE_NONE && type != COMPRESS_TYPE_SNAPPY) { cntl->SetFailed(EREQUEST, "nova_pbrpc protocol doesn't support " @@ -165,10 +166,11 @@ void SerializeNovaRequest(butil::IOBuf* buf, Controller* cntl, void PackNovaRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method_obj, Controller* controller, const butil::IOBuf& request, const Authenticator* /*not supported*/) { + const google::protobuf::MethodDescriptor* method = static_cast(method_obj); ControllerPrivateAccessor accessor(controller); if (controller->connection_type() == CONNECTION_TYPE_SINGLE) { return controller->SetFailed( diff --git a/src/brpc/policy/nova_pbrpc_protocol.h b/src/brpc/policy/nova_pbrpc_protocol.h index 448c3dbc74..b4a713a2f6 100644 --- a/src/brpc/policy/nova_pbrpc_protocol.h +++ b/src/brpc/policy/nova_pbrpc_protocol.h @@ -30,13 +30,13 @@ namespace policy { void ProcessNovaResponse(InputMessageBase* msg); void SerializeNovaRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request_obj); // Pack `request' to `method' into `buf'. void PackNovaRequest(butil::IOBuf* buf, SocketMessage** user_message_out, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/policy/nshead_mcpack_protocol.cpp b/src/brpc/policy/nshead_mcpack_protocol.cpp index 052fd0f3b7..2849e3df52 100644 --- a/src/brpc/policy/nshead_mcpack_protocol.cpp +++ b/src/brpc/policy/nshead_mcpack_protocol.cpp @@ -137,7 +137,8 @@ void ProcessNsheadMcpackResponse(InputMessageBase* msg_base) { } void SerializeNsheadMcpackRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* pb_req) { + const void* request_obj) { + const google::protobuf::Message* pb_req = static_cast(request_obj); CompressType type = cntl->request_compress_type(); if (type != COMPRESS_TYPE_NONE) { cntl->SetFailed(EREQUEST, @@ -155,7 +156,7 @@ void SerializeNsheadMcpackRequest(butil::IOBuf* buf, Controller* cntl, void PackNsheadMcpackRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor*, + const void*, Controller* controller, const butil::IOBuf& request, const Authenticator* /*not supported*/) { diff --git a/src/brpc/policy/nshead_mcpack_protocol.h b/src/brpc/policy/nshead_mcpack_protocol.h index bda40319a9..991a2697b2 100644 --- a/src/brpc/policy/nshead_mcpack_protocol.h +++ b/src/brpc/policy/nshead_mcpack_protocol.h @@ -30,13 +30,13 @@ namespace policy { void ProcessNsheadMcpackResponse(InputMessageBase* msg); void SerializeNsheadMcpackRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request_obj); // Pack `request' to `method' into `buf'. void PackNsheadMcpackRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/policy/nshead_protocol.cpp b/src/brpc/policy/nshead_protocol.cpp index a26dc96857..9d32610320 100644 --- a/src/brpc/policy/nshead_protocol.cpp +++ b/src/brpc/policy/nshead_protocol.cpp @@ -400,7 +400,8 @@ bool VerifyNsheadRequest(const InputMessageBase* msg_base) { } void SerializeNsheadRequest(butil::IOBuf* request_buf, Controller* cntl, - const google::protobuf::Message* req_base) { + const void* request_obj) { + const google::protobuf::Message* req_base = static_cast(request_obj); if (req_base == NULL) { return cntl->SetFailed(EREQUEST, "request is NULL"); } @@ -426,7 +427,7 @@ void PackNsheadRequest( butil::IOBuf* packet_buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor*, + const void*, Controller* cntl, const butil::IOBuf& request, const Authenticator*) { diff --git a/src/brpc/policy/nshead_protocol.h b/src/brpc/policy/nshead_protocol.h index 41e27105ab..29ed19829b 100644 --- a/src/brpc/policy/nshead_protocol.h +++ b/src/brpc/policy/nshead_protocol.h @@ -35,13 +35,13 @@ void ProcessNsheadRequest(InputMessageBase* msg); void ProcessNsheadResponse(InputMessageBase* msg); void SerializeNsheadRequest(butil::IOBuf* request_buf, Controller* controller, - const google::protobuf::Message* request); + const void* request_obj); void PackNsheadRequest( butil::IOBuf* packet_buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor*, + const void*, Controller* controller, const butil::IOBuf&, const Authenticator*); diff --git a/src/brpc/policy/public_pbrpc_protocol.cpp b/src/brpc/policy/public_pbrpc_protocol.cpp index 38a749dc72..c2facdf445 100644 --- a/src/brpc/policy/public_pbrpc_protocol.cpp +++ b/src/brpc/policy/public_pbrpc_protocol.cpp @@ -214,7 +214,8 @@ void ProcessPublicPbrpcResponse(InputMessageBase* msg_base) { } void SerializePublicPbrpcRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request) { + const void* request_obj) { + const google::protobuf::Message* request = static_cast(request_obj); CompressType type = cntl->request_compress_type(); if (type != COMPRESS_TYPE_NONE && type != COMPRESS_TYPE_SNAPPY) { cntl->SetFailed(EREQUEST, "public_pbrpc doesn't support " @@ -227,10 +228,12 @@ void SerializePublicPbrpcRequest(butil::IOBuf* buf, Controller* cntl, void PackPublicPbrpcRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method_descriptor, Controller* controller, const butil::IOBuf& request, const Authenticator* /*not supported*/) { + const google::protobuf::MethodDescriptor* method = + static_cast(method_descriptor); PublicPbrpcRequest pbreq; RequestHead* head = pbreq.mutable_requesthead(); RequestBody* body = pbreq.add_requestbody(); diff --git a/src/brpc/policy/public_pbrpc_protocol.h b/src/brpc/policy/public_pbrpc_protocol.h index 3e630a5eeb..3bbf739408 100644 --- a/src/brpc/policy/public_pbrpc_protocol.h +++ b/src/brpc/policy/public_pbrpc_protocol.h @@ -30,13 +30,13 @@ namespace policy { void ProcessPublicPbrpcResponse(InputMessageBase* msg); void SerializePublicPbrpcRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request_obj); // Pack `request' to `method' into `buf'. void PackPublicPbrpcRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method_descriptor, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/policy/redis_protocol.cpp b/src/brpc/policy/redis_protocol.cpp index f8acf49d6a..aee45b02ba 100644 --- a/src/brpc/policy/redis_protocol.cpp +++ b/src/brpc/policy/redis_protocol.cpp @@ -273,7 +273,8 @@ void ProcessRedisRequest(InputMessageBase* msg_base) { } void SerializeRedisRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request) { + const void* request_obj) { + const google::protobuf::Message* request = static_cast(request_obj); if (request == NULL) { return cntl->SetFailed(EREQUEST, "request is NULL"); } @@ -299,7 +300,7 @@ void SerializeRedisRequest(butil::IOBuf* buf, void PackRedisRequest(butil::IOBuf* buf, SocketMessage**, uint64_t /*correlation_id*/, - const google::protobuf::MethodDescriptor*, + const void*, Controller* cntl, const butil::IOBuf& request, const Authenticator* auth) { diff --git a/src/brpc/policy/redis_protocol.h b/src/brpc/policy/redis_protocol.h index b3f71845cc..35cb28543b 100644 --- a/src/brpc/policy/redis_protocol.h +++ b/src/brpc/policy/redis_protocol.h @@ -42,13 +42,13 @@ void ProcessRedisRequest(InputMessageBase* msg); // Serialize a redis request. void SerializeRedisRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request_obj); // Pack `request' to `method' into `buf'. void PackRedisRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/policy/rtmp_protocol.cpp b/src/brpc/policy/rtmp_protocol.cpp index 8b251eb2de..7fd0f7f463 100644 --- a/src/brpc/policy/rtmp_protocol.cpp +++ b/src/brpc/policy/rtmp_protocol.cpp @@ -3627,7 +3627,7 @@ RtmpCreateStreamMessage::AppendAndDestroySelf(butil::IOBuf* out, Socket* s) { void PackRtmpRequest(butil::IOBuf* /*buf*/, SocketMessage** user_message, uint64_t /*correlation_id*/, - const google::protobuf::MethodDescriptor* /*NULL*/, + const void* /*NULL*/, Controller* cntl, const butil::IOBuf& /*request*/, const Authenticator*) { @@ -3671,7 +3671,7 @@ void PackRtmpRequest(butil::IOBuf* /*buf*/, void SerializeRtmpRequest(butil::IOBuf* /*buf*/, Controller* /*cntl*/, - const google::protobuf::Message* /*NULL*/) { + const void* /*NULL*/) { } } // namespace policy diff --git a/src/brpc/policy/rtmp_protocol.h b/src/brpc/policy/rtmp_protocol.h index b5572c2f18..018ca9b7b2 100644 --- a/src/brpc/policy/rtmp_protocol.h +++ b/src/brpc/policy/rtmp_protocol.h @@ -551,7 +551,7 @@ void ProcessRtmpMessage(InputMessageBase* msg); void PackRtmpRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); @@ -559,7 +559,7 @@ void PackRtmpRequest(butil::IOBuf* buf, // Serialize createStream message void SerializeRtmpRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request); // ============== inline impl. ================= // TODO(gejun): impl. do not work for big-endian machines. diff --git a/src/brpc/policy/sofa_pbrpc_protocol.cpp b/src/brpc/policy/sofa_pbrpc_protocol.cpp index 2fb33ed578..877b934a22 100644 --- a/src/brpc/policy/sofa_pbrpc_protocol.cpp +++ b/src/brpc/policy/sofa_pbrpc_protocol.cpp @@ -552,10 +552,12 @@ void ProcessSofaResponse(InputMessageBase* msg_base) { void PackSofaRequest(butil::IOBuf* req_buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method_descriptor, Controller* cntl, const butil::IOBuf& req_body, const Authenticator* /*not supported*/) { + const google::protobuf::MethodDescriptor* method = + static_cast(method_descriptor); if (!cntl->request_attachment().empty()) { LOG(WARNING) << "sofa-pbrpc does not support attachment, " "your request_attachment will not be sent"; diff --git a/src/brpc/policy/sofa_pbrpc_protocol.h b/src/brpc/policy/sofa_pbrpc_protocol.h index 715949eb98..76860fa3e9 100644 --- a/src/brpc/policy/sofa_pbrpc_protocol.h +++ b/src/brpc/policy/sofa_pbrpc_protocol.h @@ -42,7 +42,7 @@ bool VerifySofaRequest(const InputMessageBase* msg); void PackSofaRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method_descriptor, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/policy/ubrpc2pb_protocol.cpp b/src/brpc/policy/ubrpc2pb_protocol.cpp index fe2c4619cb..3e7897467d 100644 --- a/src/brpc/policy/ubrpc2pb_protocol.cpp +++ b/src/brpc/policy/ubrpc2pb_protocol.cpp @@ -528,19 +528,21 @@ static void SerializeUbrpcRequest(butil::IOBuf* buf, Controller* cntl, } void SerializeUbrpcCompackRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request) { + const void* request_obj) { + const google::protobuf::Message* request = static_cast(request_obj); return SerializeUbrpcRequest(buf, cntl, request, mcpack2pb::FORMAT_COMPACK); } void SerializeUbrpcMcpack2Request(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request) { + const void* request_obj) { + const google::protobuf::Message* request = static_cast(request_obj); return SerializeUbrpcRequest(buf, cntl, request, mcpack2pb::FORMAT_MCPACK_V2); } void PackUbrpcRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor*, + const void*, Controller* controller, const butil::IOBuf& request, const Authenticator* /*not supported*/) { diff --git a/src/brpc/policy/ubrpc2pb_protocol.h b/src/brpc/policy/ubrpc2pb_protocol.h index 669222db00..507744b4d5 100644 --- a/src/brpc/policy/ubrpc2pb_protocol.h +++ b/src/brpc/policy/ubrpc2pb_protocol.h @@ -30,14 +30,14 @@ namespace policy { void ProcessUbrpcResponse(InputMessageBase* msg); void SerializeUbrpcCompackRequest(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request_obj); void SerializeUbrpcMcpack2Request(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request_obj); void PackUbrpcRequest(butil::IOBuf* buf, SocketMessage**, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method_descriptor, Controller* controller, const butil::IOBuf& request, const Authenticator* auth); diff --git a/src/brpc/protocol.cpp b/src/brpc/protocol.cpp index 9bb1fde315..e5cdeaa6b4 100644 --- a/src/brpc/protocol.cpp +++ b/src/brpc/protocol.cpp @@ -131,7 +131,8 @@ void ListProtocols(std::vector >* vec) { } void SerializeRequestDefault(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request) { + const void* request_obj) { + const google::protobuf::Message* request = static_cast(request_obj); // Check sanity of request. if (!request) { return cntl->SetFailed(EREQUEST, "`request' is NULL"); diff --git a/src/brpc/protocol.h b/src/brpc/protocol.h index 0492d0b5f7..038aadfd3a 100644 --- a/src/brpc/protocol.h +++ b/src/brpc/protocol.h @@ -101,7 +101,7 @@ struct Protocol { typedef void (*SerializeRequest)( butil::IOBuf* request_buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request_obj); SerializeRequest serialize_request; // [Required by client] @@ -113,7 +113,7 @@ struct Protocol { butil::IOBuf* iobuf_out, SocketMessage** user_message_out, uint64_t correlation_id, - const google::protobuf::MethodDescriptor* method, + const void* method_descriptor, Controller* controller, const butil::IOBuf& request_buf, const Authenticator* auth); @@ -198,7 +198,7 @@ void ListProtocols(std::vector >* vec); // The common serialize_request implementation used by many protocols. void SerializeRequestDefault(butil::IOBuf* buf, Controller* cntl, - const google::protobuf::Message* request); + const void* request_obj); // Replacements for msg->ParseFromXXX() to make the bytes limit in pb // consistent with -max_body_size diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index dd155a3044..aee484ff53 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -490,7 +490,7 @@ const Controller* GetSubControllerOfSelectiveChannel( } static void PassSerializeRequest(butil::IOBuf*, Controller*, - const google::protobuf::Message*) { + const void*) { } SelectiveChannel::SelectiveChannel() {} diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 8e2368bcb2..863a4891f4 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -167,6 +167,20 @@ ServerSSLOptions* ServerOptions::mutable_ssl_options() { return _ssl_options.get(); } +Server::FlatBuffersMethodProperty::FlatBuffersMethodProperty() + : is_builtin_service(false) + , own_method_status(false) + , service(NULL) + , method(NULL) + , status(NULL) { +} + +Server::FlatBuffersServiceProperty::FlatBuffersServiceProperty() + :service(NULL) + ,method_count(0) + ,methods_list(NULL){ +} + Server::MethodProperty::OpaqueParams::OpaqueParams() : is_tabbed(false) , allow_default_url(false) @@ -422,6 +436,14 @@ const std::string Server::ServiceProperty::service_name() const { return s_unknown_name; } +const std::string& Server::FlatBuffersServiceProperty::service_name() const { + if (service) { + return service->GetDescriptor()->full_name(); + } + const static std::string s_unknown_name = ""; + return s_unknown_name; +} + Server::Server(ProfilerLinker) : _session_local_data_pool(NULL) , _status(UNINITIALIZED) @@ -1614,6 +1636,72 @@ int Server::AddServiceInternal(google::protobuf::Service* service, return 0; } +int Server::AddServiceInternal(brpc::flatbuffers::Service* service, + bool is_builtin_service, + const ServiceOptions& options) { + if (is_builtin_service) { + LOG(ERROR) << "builtin_service of flatbuffers rpc is not support"; + return -1; + } + if (NULL == service) { + LOG(ERROR) << "Parameter[service] is NULL!"; + return -1; + } + const brpc::flatbuffers::ServiceDescriptor* sd = service->GetDescriptor(); + int method_count = sd->method_count(); + if (method_count <= 0) { + LOG(ERROR) << "service=" << sd->full_name() + << " does not have any method."; + return -1; + } + if (InitializeOnce() != 0) { + LOG(ERROR) << "Fail to initialize Server[" << version() << ']'; + return -1; + } + if (status() != READY) { + LOG(ERROR) << "Can't add service=" << sd->full_name() << " to Server[" + << version() << "] which is " << status_str(status()); + return -1; + } + // Check service conflict using service's index + FlatBuffersServiceProperty* c_ss = _fb_server_index_map.seek(sd->index()); + if (c_ss != NULL) { + LOG(ERROR) << "service:" << sd->full_name() + << " with index:"<< sd->index() + << " conflicts with registed service:" << c_ss->service->GetDescriptor()->full_name() + << " Try to change your service name."; + return -1; + } + + // Register ServiceProperty + FlatBuffersServiceProperty ss; + ss.service = service; + ss.method_count = method_count; + ss.methods_list = new FlatBuffersMethodProperty*[method_count]; + if (!ss.methods_list) { + LOG(ERROR) << "Fail to alloc methods_list"; + return -1; + } + memset(ss.methods_list, 0, method_count * sizeof(FlatBuffersMethodProperty*)); + _fb_server_index_map[sd->index()] = ss; + + // Register MethodProperty + for (int i = 0; i < method_count; ++i) { + const brpc::flatbuffers::MethodDescriptor* md = sd->method(i); + FlatBuffersMethodProperty* mp = new FlatBuffersMethodProperty(); + if (!mp) { + LOG(ERROR) << "Fail to alloc FlatBuffersMethodProperty"; + return -1; + } + mp->service = service; + mp->method = md; + mp->status = new MethodStatus; + ss.methods_list[i] = mp; + } + + return 0; +} + ServiceOptions::ServiceOptions() : ownership(SERVER_DOESNT_OWN_SERVICE) , allow_default_url(false) @@ -1651,6 +1739,18 @@ int Server::AddService(google::protobuf::Service* service, return AddServiceInternal(service, false, options); } +int Server::AddService(brpc::flatbuffers::Service* service, + ServiceOwnership ownership) { + ServiceOptions options; + options.ownership = ownership; + return AddServiceInternal(service, false, options); +} + +int Server::AddService(brpc::flatbuffers::Service* service, + const ServiceOptions& options) { + return AddServiceInternal(service, false, options); +} + int Server::AddBuiltinService(google::protobuf::Service* service) { ServiceOptions options; options.ownership = SERVER_OWNS_SERVICE; @@ -2042,6 +2142,27 @@ Server::FindServicePropertyByName(const butil::StringPiece& name) const { return _service_map.seek(name); } +const Server::FlatBuffersServiceProperty* +Server::FindFlatBuffersServicePropertyByIndex(uint32_t service_index) const { + return _fb_server_index_map.seek(service_index); +} + +const Server::FlatBuffersMethodProperty* +Server::FindFlatBufferMethodPropertyByIndex(uint32_t service_index, int method_index) const { + const Server::FlatBuffersServiceProperty* sp = + FindFlatBuffersServicePropertyByIndex(service_index); + if (NULL == sp || NULL == sp->methods_list) { + return NULL; + } + if (method_index < 0 || method_index >= sp->method_count) { + return NULL; + } + if (!sp->methods_list) { + return NULL; + } + return sp->methods_list[method_index]; +} + int Server::AddCertificate(const CertInfo& cert) { if (!_options.has_ssl_options()) { LOG(ERROR) << "ServerOptions.ssl_options is not configured yet"; diff --git a/src/brpc/server.h b/src/brpc/server.h index c262375c67..ed549f167c 100644 --- a/src/brpc/server.h +++ b/src/brpc/server.h @@ -45,6 +45,7 @@ #include "brpc/concurrency_limiter.h" #include "brpc/baidu_master_service.h" #include "brpc/rpc_pb_message_factory.h" +#include "brpc/details/flatbuffers_impl.h" namespace brpc { @@ -428,6 +429,25 @@ class Server { }; typedef butil::FlatMap MethodMap; + struct FlatBuffersMethodProperty { + bool is_builtin_service; + bool own_method_status; + brpc::flatbuffers::Service* service; + const brpc::flatbuffers::MethodDescriptor* method; + MethodStatus* status; + FlatBuffersMethodProperty(); + }; + + struct FlatBuffersServiceProperty { + brpc::flatbuffers::Service* service; + int method_count; + FlatBuffersMethodProperty** methods_list; + bool is_user_service() const {return false;} + + const std::string& service_name() const; + FlatBuffersServiceProperty(); + }; + struct ThreadLocalOptions { bthread_key_t tls_key; const DataFactory* thread_local_data_factory; @@ -493,6 +513,10 @@ class Server { bool allow_default_url = false); int AddService(google::protobuf::Service* service, const ServiceOptions& options); + int AddService(brpc::flatbuffers::Service* service, + ServiceOwnership ownership); + int AddService(brpc::flatbuffers::Service* service, + const ServiceOptions& options); // Remove a service from this server. // NOTE: removing a service while server is running is forbidden. @@ -628,6 +652,10 @@ friend class Controller; bool is_builtin_service, const ServiceOptions& options); + int AddServiceInternal(brpc::flatbuffers::Service* service, + bool is_builtin_service, + const ServiceOptions& options); + int AddBuiltinService(google::protobuf::Service* service); // Remove all methods of `service' from internal structures. @@ -680,6 +708,12 @@ friend class Controller; const ServiceProperty* FindServicePropertyByName(const butil::StringPiece& name) const; + const FlatBuffersServiceProperty* + FindFlatBuffersServicePropertyByIndex(uint32_t service_index) const; + + const FlatBuffersMethodProperty* + FindFlatBufferMethodPropertyByIndex(uint32_t service_index, int method_index) const; + std::string ServerPrefix() const; // Mapping from hostname to corresponding SSL_CTX @@ -754,6 +788,10 @@ friend class Controller; // uses service->name() to designate an RPC service ServiceMap _service_map; + //used by flatbuffers + typedef butil::FlatMap FlatBuffersServiceIDMap; + FlatBuffersServiceIDMap _fb_server_index_map; + // The only non-builtin service in _service_map, otherwise NULL. google::protobuf::Service* _first_service;