diff --git a/example/benchmark_fb/CMakeLists.txt b/example/benchmark_fb/CMakeLists.txt new file mode 100644 index 0000000000..3ab1f8745d --- /dev/null +++ b/example/benchmark_fb/CMakeLists.txt @@ -0,0 +1,139 @@ +cmake_minimum_required(VERSION 2.8.10) +project(benchmark_fb C CXX) + +option(LINK_SO "Whether examples are linked dynamically" OFF) +option(WITH_ASAN "With AddressSanitizer" OFF) + +execute_process( + COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" + OUTPUT_VARIABLE OUTPUT_PATH +) + +set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) + +include(FindThreads) +include(FindProtobuf) + +# include current directory for generated files +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) + +# Search for libthrift* by best effort. If it is not found and brpc is +# compiled with thrift protocol enabled, a link error would be reported. +find_library(THRIFT_LIB NAMES thrift) +if (NOT THRIFT_LIB) + set(THRIFT_LIB "") +endif() + +find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h) +if(LINK_SO) + find_library(BRPC_LIB NAMES brpc) +else() + find_library(BRPC_LIB NAMES libbrpc.a brpc) +endif() +if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB)) + message(FATAL_ERROR "Fail to find brpc") +endif() +include_directories(${BRPC_INCLUDE_PATH}) + +find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) +find_library(GFLAGS_LIBRARY NAMES gflags libgflags) +if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) + message(FATAL_ERROR "Fail to find gflags") +endif() +include_directories(${GFLAGS_INCLUDE_PATH}) + +# Find FlatBuffers +find_path(FLATBUFFERS_INCLUDE_PATH flatbuffers/flatbuffers.h) +find_library(FLATBUFFERS_LIBRARY NAMES flatbuffers) +if((NOT FLATBUFFERS_INCLUDE_PATH) OR (NOT FLATBUFFERS_LIBRARY)) + message(FATAL_ERROR "Fail to find flatbuffers") +endif() +include_directories(${FLATBUFFERS_INCLUDE_PATH}) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + include(CheckFunctionExists) + CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) + if(NOT HAVE_CLOCK_GETTIME) + set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC") + endif() +endif() + +# set(CMAKE_CXX_FLAGS "${DEFINE_CLOCK_GETTIME} -g -O0 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") +set(CMAKE_CXX_FLAGS "${DEFINE_CLOCK_GETTIME} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") + +set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS}") + +if (WITH_ASAN) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address") +endif() + +if(CMAKE_VERSION VERSION_LESS "3.1.3") + if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() + if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() +else() + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) +endif() + +find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) +find_library(LEVELDB_LIB NAMES leveldb) +if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) + message(FATAL_ERROR "Fail to find leveldb") +endif() +include_directories(${LEVELDB_INCLUDE_PATH}) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(OPENSSL_ROOT_DIR + "/usr/local/opt/openssl" # Homebrew installed OpenSSL + ) +endif() + +find_package(OpenSSL) +include_directories(${OPENSSL_INCLUDE_DIR}) + +set(DYNAMIC_LIB + ${CMAKE_THREAD_LIBS_INIT} + ${GFLAGS_LIBRARY} + ${PROTOBUF_LIBRARIES} + ${LEVELDB_LIB} + ${OPENSSL_CRYPTO_LIBRARY} + ${OPENSSL_SSL_LIBRARY} + ${FLATBUFFERS_LIBRARY} + ${THRIFT_LIB} + dl + ) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(DYNAMIC_LIB ${DYNAMIC_LIB} + pthread + "-framework CoreFoundation" + "-framework CoreGraphics" + "-framework CoreData" + "-framework CoreText" + "-framework Security" + "-framework Foundation" + "-Wl,-U,_MallocExtension_ReleaseFreeMemory" + "-Wl,-U,_ProfilerStart" + "-Wl,-U,_ProfilerStop" + "-Wl,-U,__Z13GetStackTracePPvii" + "-Wl,-U,_mallctl" + "-Wl,-U,_malloc_stats_print" + ) +endif() + +set(FLATBUFFERS_SOURCES + test.brpc.fb.cpp + test_generated.h + test.brpc.fb.h +) + +add_executable(client client.cpp ${FLATBUFFERS_SOURCES}) +add_executable(server server.cpp ${FLATBUFFERS_SOURCES}) + +target_link_libraries(client ${BRPC_LIB} ${DYNAMIC_LIB}) +target_link_libraries(server ${BRPC_LIB} ${DYNAMIC_LIB}) \ No newline at end of file diff --git a/example/benchmark_fb/client.cpp b/example/benchmark_fb/client.cpp new file mode 100644 index 0000000000..1303efd4a8 --- /dev/null +++ b/example/benchmark_fb/client.cpp @@ -0,0 +1,130 @@ +#include +#include +#include +#include +#include +#include + +#include "test.brpc.fb.h" + + +DEFINE_int32(thread_num, 1, "Number of threads to send requests"); +DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with requests"); +DEFINE_int32(request_size, 16, "Bytes of each request"); +DEFINE_string(servers, "0.0.0.0:8002", "IP Address of server"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_int32(dummy_port, -1, "Launch dummy server at this port"); + +std::string g_request; +butil::IOBuf g_attachment; + +bvar::LatencyRecorder g_latency_recorder("client"); +bvar::LatencyRecorder g_msg_recorder("msg"); +bvar::Adder g_error_count("client_error_count"); + +static void* sender(void* arg) { + test::BenchmarkServiceStub stub(static_cast(arg)); + int log_id = 0; + while (!brpc::IsAskedToQuit()) { + brpc::Controller cntl; + brpc::flatbuffers::Message response; + + cntl.set_log_id(log_id++); + cntl.request_attachment().append(g_attachment); + + uint64_t msg_begin_ns = butil::cpuwide_time_ns(); + brpc::flatbuffers::MessageBuilder mb; + auto message = mb.CreateString(g_request); + auto req = test::CreateBenchmarkRequest(mb, 123, 333, 1111, 2222, 0, message); + mb.Finish(req); + brpc::flatbuffers::Message request = mb.ReleaseMessage(); + + uint64_t msg_end_ns = butil::cpuwide_time_ns(); + stub.Test(&cntl, &request, &response, NULL); + + if (!cntl.Failed()) { + g_latency_recorder << cntl.latency_us(); + g_msg_recorder << (msg_end_ns - msg_begin_ns); + } else { + g_error_count << 1; + CHECK(brpc::IsAskedToQuit()) + << "error=" << cntl.ErrorText() << " latency=" << cntl.latency_us(); + // We can't connect to the server, sleep a while. Notice that this + // is a specific sleeping to prevent this thread from spinning too + // fast. You should continue the business logic in a production + // server rather than sleeping. + bthread_usleep(50000); + } + } + return NULL; +} + +int main(int argc, char* argv[]) { + GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true); + // Print parameter information in one line + LOG(INFO) << "Parameters - request_size : " << FLAGS_request_size + << ", attachment_size: " << FLAGS_attachment_size + << ", thread_num: " << FLAGS_thread_num; + + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = "fb_rpc"; + options.connection_type = ""; + options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100); + options.timeout_ms = FLAGS_timeout_ms; + options.max_retry = FLAGS_max_retry; + if (channel.Init(FLAGS_servers.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + if (FLAGS_attachment_size > 0) { + void* _attachment_addr = malloc(FLAGS_attachment_size); + if (!_attachment_addr) { + LOG(ERROR) << "Fail to alloc _attachment from system heap"; + return -1; + } + g_attachment.append(_attachment_addr, FLAGS_attachment_size); + free(_attachment_addr); + } + if (FLAGS_request_size < 0) { + LOG(ERROR) << "Bad request_size=" << FLAGS_request_size; + return -1; + } + g_request.resize(FLAGS_request_size, 'r'); + + if (FLAGS_dummy_port >= 0) { + brpc::StartDummyServerAt(FLAGS_dummy_port); + } + + std::vector bids; + bids.resize(FLAGS_thread_num); + for (int i = 0; i < FLAGS_thread_num; ++i) { + if (bthread_start_background(&bids[i], NULL, sender, &channel) != 0) { + LOG(ERROR) << "Fail to create bthread"; + return -1; + } + } + + while (!brpc::IsAskedToQuit()) { + sleep(1); + LOG(INFO) << "Sending EchoRequest at qps=" << (g_latency_recorder.qps(1) / 1000) + << "k latency=" << g_latency_recorder.latency(1) << "us" + << " msg latency=" << g_msg_recorder.latency(1) << "ns"; + } + + LOG(INFO) << "EchoClient is going to quit"; + for (int i = 0; i < FLAGS_thread_num; ++i) { + bthread_join(bids[i], NULL); + } + + LOG(INFO) << "Average QPS: " << (g_latency_recorder.qps()/1000) << "k" + << " Average latency: " << g_latency_recorder.latency() << "us" + << " msg latency: " << g_msg_recorder.latency() << "ns"; + + return 0; +} \ No newline at end of file diff --git a/example/benchmark_fb/server.cpp b/example/benchmark_fb/server.cpp new file mode 100644 index 0000000000..45cc671528 --- /dev/null +++ b/example/benchmark_fb/server.cpp @@ -0,0 +1,74 @@ +#include +#include +#include +#include "test.brpc.fb.h" + +DEFINE_bool(echo_attachment, true, "Echo attachment as well"); +DEFINE_int32(port, 8080, "TCP Port of this server"); +DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s'"); +DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); +DEFINE_int32(internal_port, -1, "Only allow builtin services at this port"); + +namespace test{ +class BenchmarkServiceImpl : public BenchmarkService { +public: + BenchmarkServiceImpl() {} + ~BenchmarkServiceImpl() {} + + void Test(google::protobuf::RpcController* controller, + const brpc::flatbuffers::Message* request_base, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + brpc::Controller* cntl = + static_cast(controller); + const test::BenchmarkRequest* request = request_base->GetRoot(); + // Set Response Message + brpc::flatbuffers::MessageBuilder mb_; + const char *req_str = request->message()->c_str(); + auto message = mb_.CreateString(req_str); + auto resp = test::CreateBenchmarkResponse(mb_, request->opcode(), + request->echo_attachment(), request->attachment_size(), + request->request_id(),request->reserved(), message); + mb_.Finish(resp); + *response = mb_.ReleaseMessage(); + if (FLAGS_echo_attachment) { + cntl->response_attachment().append(cntl->request_attachment()); + } + } +}; + +} + +int main(int argc, char* argv[]) { + GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true); + + // Generally you only need one Server. + brpc::Server server; + + // Instance of your service. + test::BenchmarkServiceImpl benchmark_service_impl; + + if (server.AddService(&benchmark_service_impl, + brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return -1; + } + + // Start the server. + brpc::ServerOptions options; + options.idle_timeout_sec = FLAGS_idle_timeout_s; + options.max_concurrency = FLAGS_max_concurrency; + options.internal_port = FLAGS_internal_port; + + if (server.Start(FLAGS_port, &options) != 0) { + LOG(ERROR) << "Fail to start EchoServer"; + return -1; + } + + // Wait until Ctrl-C is pressed, then Stop() and Join() the server. + server.RunUntilAskedToQuit(); + return 0; + +} \ No newline at end of file diff --git a/example/benchmark_fb/test.brpc.fb.cpp b/example/benchmark_fb/test.brpc.fb.cpp new file mode 100644 index 0000000000..caaf5ac884 --- /dev/null +++ b/example/benchmark_fb/test.brpc.fb.cpp @@ -0,0 +1,72 @@ +// Generated by the BRPC C++ plugin. +// If you make any local change, they will be lost. +// source: test.fbs +#include "test.brpc.fb.h" +#include + +namespace test { + +static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL}; +BenchmarkService::~BenchmarkService() {} +const brpc::flatbuffers::ServiceDescriptor* BenchmarkService::descriptor() { + if (file_level_service_descriptors_my_2eproto[0] == NULL) { + const brpc::flatbuffers::BrpcDescriptorTable desc_table = { + "test.", "BenchmarkService" , "Test"}; + if (brpc::flatbuffers::parse_service_descriptors(desc_table, &file_level_service_descriptors_my_2eproto[0])) { + std::cout << "ERROR: " << "Fail to parse_service_descriptors" << std::endl; + return NULL; + } + } + return file_level_service_descriptors_my_2eproto[0]; +} + +const brpc::flatbuffers::ServiceDescriptor* BenchmarkService::GetDescriptor() { + return descriptor(); +} + +void BenchmarkService::Test(google::protobuf::RpcController* controller, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done) { + controller->SetFailed("method Test() not implemented."); + std::cout << "ERROR: " << "method Test() not implemented." << std::endl; +} + +void BenchmarkService::FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method, + google::protobuf::RpcController* controller, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done) { + FLATBUFFERS_ASSERT(method->service() == file_level_service_descriptors_my_2eproto[0]); + switch(method->index()) { + case 0: + Test(controller, request, response, done); + break; + default: + std::cout << "ERROR: " << "Bad method index; this should never happen." << std::endl; + break; + } +} +BenchmarkServiceStub::BenchmarkServiceStub(brpc::flatbuffers::RpcChannel* channel) + : channel_(channel), owns_channel_(false) {} + +BenchmarkServiceStub::BenchmarkServiceStub( + brpc::flatbuffers::RpcChannel* channel, + brpc::flatbuffers::Service::ChannelOwnership ownership) + : channel_(channel), + owns_channel_(ownership == brpc::flatbuffers::Service::STUB_OWNS_CHANNEL) {} + +BenchmarkServiceStub::~BenchmarkServiceStub(){ + if (owns_channel_) {delete channel_;} +} + +void BenchmarkServiceStub::Test(google::protobuf::RpcController* controller, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done) { + channel_->FBCallMethod(descriptor()->method(0), + controller, request, response, done); +} + +} // namespace test + diff --git a/example/benchmark_fb/test.brpc.fb.h b/example/benchmark_fb/test.brpc.fb.h new file mode 100644 index 0000000000..8105a49c32 --- /dev/null +++ b/example/benchmark_fb/test.brpc.fb.h @@ -0,0 +1,54 @@ +// Generated by the BRPC C++ plugin. +// If you make any local change, they will be lost. +// source: test.fbs +#ifndef BRPC_test__INCLUDED +#define BRPC_test__INCLUDED +#include "test_generated.h" +#include +#include + +namespace test { + +class BenchmarkServiceStub; +class BenchmarkService : public brpc::flatbuffers::Service { +protected: + inline BenchmarkService() {}; +public: + virtual ~BenchmarkService(); + static const brpc::flatbuffers::ServiceDescriptor* descriptor(); + virtual void Test(google::protobuf::RpcController* controller, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done); + + const brpc::flatbuffers::ServiceDescriptor* GetDescriptor(); + void FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method, + google::protobuf::RpcController* controller, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done); +private: + FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS(BenchmarkService); +}; + +class BenchmarkServiceStub : public BenchmarkService { +public: + BenchmarkServiceStub(brpc::flatbuffers::RpcChannel* channel); + BenchmarkServiceStub(brpc::flatbuffers::RpcChannel* channel, + brpc::flatbuffers::Service::ChannelOwnership ownership); + ~BenchmarkServiceStub(); + inline brpc::flatbuffers::RpcChannel* channel() { return channel_; } + void Test(google::protobuf::RpcController* controller, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done); + +private: + brpc::flatbuffers::RpcChannel* channel_; + bool owns_channel_; + FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS(BenchmarkServiceStub); +}; + +} // namespace test + +#endif // BRPC_test__INCLUDED diff --git a/example/benchmark_fb/test.fbs b/example/benchmark_fb/test.fbs new file mode 100644 index 0000000000..6c49e1f116 --- /dev/null +++ b/example/benchmark_fb/test.fbs @@ -0,0 +1,24 @@ +namespace test; + +table BenchmarkRequest { + opcode:int; + echo_attachment:int; + attachment_size:long; + request_id:long; + + reserved:long; + message:string; +} + +table BenchmarkResponse { + opcode:int; + echo_attachment:int; + attachment_size:long; + request_id:long; + reserved:long; + message:string; +} + +rpc_service BenchmarkService { + Test(BenchmarkRequest):BenchmarkResponse; +} \ No newline at end of file diff --git a/example/benchmark_fb/test_generated.h b/example/benchmark_fb/test_generated.h new file mode 100644 index 0000000000..0b37615357 --- /dev/null +++ b/example/benchmark_fb/test_generated.h @@ -0,0 +1,284 @@ +// automatically generated by the FlatBuffers compiler, do not modify + + +#ifndef FLATBUFFERS_GENERATED_TEST_TEST_H_ +#define FLATBUFFERS_GENERATED_TEST_TEST_H_ + +#include "flatbuffers/flatbuffers.h" + +// Ensure the included flatbuffers.h is the same version as when this file was +// generated, otherwise it may not be compatible. +static_assert(FLATBUFFERS_VERSION_MAJOR == 25 && + FLATBUFFERS_VERSION_MINOR == 9 && + FLATBUFFERS_VERSION_REVISION == 23, + "Non-compatible flatbuffers version included"); + +namespace test { + +struct BenchmarkRequest; +struct BenchmarkRequestBuilder; + +struct BenchmarkResponse; +struct BenchmarkResponseBuilder; + +struct BenchmarkRequest FLATBUFFERS_FINAL_CLASS : private ::flatbuffers::Table { + typedef BenchmarkRequestBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_OPCODE = 4, + VT_ECHO_ATTACHMENT = 6, + VT_ATTACHMENT_SIZE = 8, + VT_REQUEST_ID = 10, + VT_RESERVED = 12, + VT_MESSAGE = 14 + }; + int32_t opcode() const { + return GetField(VT_OPCODE, 0); + } + bool mutate_opcode(int32_t _opcode = 0) { + return SetField(VT_OPCODE, _opcode, 0); + } + int32_t echo_attachment() const { + return GetField(VT_ECHO_ATTACHMENT, 0); + } + bool mutate_echo_attachment(int32_t _echo_attachment = 0) { + return SetField(VT_ECHO_ATTACHMENT, _echo_attachment, 0); + } + int64_t attachment_size() const { + return GetField(VT_ATTACHMENT_SIZE, 0); + } + bool mutate_attachment_size(int64_t _attachment_size = 0) { + return SetField(VT_ATTACHMENT_SIZE, _attachment_size, 0); + } + int64_t request_id() const { + return GetField(VT_REQUEST_ID, 0); + } + bool mutate_request_id(int64_t _request_id = 0) { + return SetField(VT_REQUEST_ID, _request_id, 0); + } + int64_t reserved() const { + return GetField(VT_RESERVED, 0); + } + bool mutate_reserved(int64_t _reserved = 0) { + return SetField(VT_RESERVED, _reserved, 0); + } + const ::flatbuffers::String *message() const { + return GetPointer(VT_MESSAGE); + } + ::flatbuffers::String *mutable_message() { + return GetPointer<::flatbuffers::String *>(VT_MESSAGE); + } + bool Verify(::flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyField(verifier, VT_OPCODE, 4) && + VerifyField(verifier, VT_ECHO_ATTACHMENT, 4) && + VerifyField(verifier, VT_ATTACHMENT_SIZE, 8) && + VerifyField(verifier, VT_REQUEST_ID, 8) && + VerifyField(verifier, VT_RESERVED, 8) && + VerifyOffset(verifier, VT_MESSAGE) && + verifier.VerifyString(message()) && + verifier.EndTable(); + } +}; + +struct BenchmarkRequestBuilder { + typedef BenchmarkRequest Table; + ::flatbuffers::FlatBufferBuilder &fbb_; + ::flatbuffers::uoffset_t start_; + void add_opcode(int32_t opcode) { + fbb_.AddElement(BenchmarkRequest::VT_OPCODE, opcode, 0); + } + void add_echo_attachment(int32_t echo_attachment) { + fbb_.AddElement(BenchmarkRequest::VT_ECHO_ATTACHMENT, echo_attachment, 0); + } + void add_attachment_size(int64_t attachment_size) { + fbb_.AddElement(BenchmarkRequest::VT_ATTACHMENT_SIZE, attachment_size, 0); + } + void add_request_id(int64_t request_id) { + fbb_.AddElement(BenchmarkRequest::VT_REQUEST_ID, request_id, 0); + } + void add_reserved(int64_t reserved) { + fbb_.AddElement(BenchmarkRequest::VT_RESERVED, reserved, 0); + } + void add_message(::flatbuffers::Offset<::flatbuffers::String> message) { + fbb_.AddOffset(BenchmarkRequest::VT_MESSAGE, message); + } + explicit BenchmarkRequestBuilder(::flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + ::flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = ::flatbuffers::Offset(end); + return o; + } +}; + +inline ::flatbuffers::Offset CreateBenchmarkRequest( + ::flatbuffers::FlatBufferBuilder &_fbb, + int32_t opcode = 0, + int32_t echo_attachment = 0, + int64_t attachment_size = 0, + int64_t request_id = 0, + int64_t reserved = 0, + ::flatbuffers::Offset<::flatbuffers::String> message = 0) { + BenchmarkRequestBuilder builder_(_fbb); + builder_.add_reserved(reserved); + builder_.add_request_id(request_id); + builder_.add_attachment_size(attachment_size); + builder_.add_message(message); + builder_.add_echo_attachment(echo_attachment); + builder_.add_opcode(opcode); + return builder_.Finish(); +} + +inline ::flatbuffers::Offset CreateBenchmarkRequestDirect( + ::flatbuffers::FlatBufferBuilder &_fbb, + int32_t opcode = 0, + int32_t echo_attachment = 0, + int64_t attachment_size = 0, + int64_t request_id = 0, + int64_t reserved = 0, + const char *message = nullptr) { + auto message__ = message ? _fbb.CreateString(message) : 0; + return test::CreateBenchmarkRequest( + _fbb, + opcode, + echo_attachment, + attachment_size, + request_id, + reserved, + message__); +} + +struct BenchmarkResponse FLATBUFFERS_FINAL_CLASS : private ::flatbuffers::Table { + typedef BenchmarkResponseBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_OPCODE = 4, + VT_ECHO_ATTACHMENT = 6, + VT_ATTACHMENT_SIZE = 8, + VT_REQUEST_ID = 10, + VT_RESERVED = 12, + VT_MESSAGE = 14 + }; + int32_t opcode() const { + return GetField(VT_OPCODE, 0); + } + bool mutate_opcode(int32_t _opcode = 0) { + return SetField(VT_OPCODE, _opcode, 0); + } + int32_t echo_attachment() const { + return GetField(VT_ECHO_ATTACHMENT, 0); + } + bool mutate_echo_attachment(int32_t _echo_attachment = 0) { + return SetField(VT_ECHO_ATTACHMENT, _echo_attachment, 0); + } + int64_t attachment_size() const { + return GetField(VT_ATTACHMENT_SIZE, 0); + } + bool mutate_attachment_size(int64_t _attachment_size = 0) { + return SetField(VT_ATTACHMENT_SIZE, _attachment_size, 0); + } + int64_t request_id() const { + return GetField(VT_REQUEST_ID, 0); + } + bool mutate_request_id(int64_t _request_id = 0) { + return SetField(VT_REQUEST_ID, _request_id, 0); + } + int64_t reserved() const { + return GetField(VT_RESERVED, 0); + } + bool mutate_reserved(int64_t _reserved = 0) { + return SetField(VT_RESERVED, _reserved, 0); + } + const ::flatbuffers::String *message() const { + return GetPointer(VT_MESSAGE); + } + ::flatbuffers::String *mutable_message() { + return GetPointer<::flatbuffers::String *>(VT_MESSAGE); + } + bool Verify(::flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyField(verifier, VT_OPCODE, 4) && + VerifyField(verifier, VT_ECHO_ATTACHMENT, 4) && + VerifyField(verifier, VT_ATTACHMENT_SIZE, 8) && + VerifyField(verifier, VT_REQUEST_ID, 8) && + VerifyField(verifier, VT_RESERVED, 8) && + VerifyOffset(verifier, VT_MESSAGE) && + verifier.VerifyString(message()) && + verifier.EndTable(); + } +}; + +struct BenchmarkResponseBuilder { + typedef BenchmarkResponse Table; + ::flatbuffers::FlatBufferBuilder &fbb_; + ::flatbuffers::uoffset_t start_; + void add_opcode(int32_t opcode) { + fbb_.AddElement(BenchmarkResponse::VT_OPCODE, opcode, 0); + } + void add_echo_attachment(int32_t echo_attachment) { + fbb_.AddElement(BenchmarkResponse::VT_ECHO_ATTACHMENT, echo_attachment, 0); + } + void add_attachment_size(int64_t attachment_size) { + fbb_.AddElement(BenchmarkResponse::VT_ATTACHMENT_SIZE, attachment_size, 0); + } + void add_request_id(int64_t request_id) { + fbb_.AddElement(BenchmarkResponse::VT_REQUEST_ID, request_id, 0); + } + void add_reserved(int64_t reserved) { + fbb_.AddElement(BenchmarkResponse::VT_RESERVED, reserved, 0); + } + void add_message(::flatbuffers::Offset<::flatbuffers::String> message) { + fbb_.AddOffset(BenchmarkResponse::VT_MESSAGE, message); + } + explicit BenchmarkResponseBuilder(::flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + ::flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = ::flatbuffers::Offset(end); + return o; + } +}; + +inline ::flatbuffers::Offset CreateBenchmarkResponse( + ::flatbuffers::FlatBufferBuilder &_fbb, + int32_t opcode = 0, + int32_t echo_attachment = 0, + int64_t attachment_size = 0, + int64_t request_id = 0, + int64_t reserved = 0, + ::flatbuffers::Offset<::flatbuffers::String> message = 0) { + BenchmarkResponseBuilder builder_(_fbb); + builder_.add_reserved(reserved); + builder_.add_request_id(request_id); + builder_.add_attachment_size(attachment_size); + builder_.add_message(message); + builder_.add_echo_attachment(echo_attachment); + builder_.add_opcode(opcode); + return builder_.Finish(); +} + +inline ::flatbuffers::Offset CreateBenchmarkResponseDirect( + ::flatbuffers::FlatBufferBuilder &_fbb, + int32_t opcode = 0, + int32_t echo_attachment = 0, + int64_t attachment_size = 0, + int64_t request_id = 0, + int64_t reserved = 0, + const char *message = nullptr) { + auto message__ = message ? _fbb.CreateString(message) : 0; + return test::CreateBenchmarkResponse( + _fbb, + opcode, + echo_attachment, + attachment_size, + request_id, + reserved, + message__); +} + +} // namespace test + +#endif // FLATBUFFERS_GENERATED_TEST_TEST_H_ diff --git a/src/brpc/details/flatbuffers_common.h b/src/brpc/details/flatbuffers_common.h new file mode 100644 index 0000000000..f7108deb0a --- /dev/null +++ b/src/brpc/details/flatbuffers_common.h @@ -0,0 +1,100 @@ +#ifndef BRPC_FLATBUFFERS_COMMON_H_ +#define BRPC_FLATBUFFERS_COMMON_H_ + +#include +#undef FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS +#define FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS(TypeName) \ + TypeName(const TypeName&) = delete; \ + void operator=(const TypeName&) = delete + +namespace google { +namespace protobuf { + class Closure; + class RpcController; +} // namespace protobuf +} // namespace google + +namespace brpc { +namespace flatbuffers { + +class Message; +class Service; +class ServiceDescriptor; +class MethodDescriptor; + +// Abstract interface for an RPC channel. An RpcChannel represents a +// communication line to a Service which can be used to call that Service's +// methods. The Service may be running on another machine. Normally, you +// should not call an RpcChannel directly, but instead construct a stub Service +// wrapping it. Example: +// RpcChannel* channel = new MyRpcChannel("remotehost.example.com:1234"); +// MyService* service = new MyService::Stub(channel); +// service->MyMethod(request, &response, callback); +class RpcChannel { +public: + inline RpcChannel() {} + virtual ~RpcChannel() {} + + // Call the given method of the remote service. The signature of this + // procedure looks the same as Service::CallMethod(), but the requirements + // are less strict in one important way: the request and response objects + // need not be of any specific class as long as their descriptors are + // method->input_type() and method->output_type(). + virtual void FBCallMethod(const MethodDescriptor* method, + google::protobuf::RpcController* controller_base, const Message* request, + Message* response, google::protobuf::Closure* done) = 0; + +private: + FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel); +}; + +class Service { +public: + inline Service() {} + virtual ~Service() {} + + // When constructing a stub, you may pass STUB_OWNS_CHANNEL as the second + // parameter to the constructor to tell it to delete its RpcChannel when + // destroyed. + enum ChannelOwnership { STUB_OWNS_CHANNEL, STUB_DOESNT_OWN_CHANNEL }; + + // Get the ServiceDescriptor describing this service and its methods. + virtual const ServiceDescriptor* GetDescriptor() = 0; + + // Call a method of the service specified by MethodDescriptor. This is + // normally implemented as a simple switch() that calls the standard + // definitions of the service's methods. + // + // Preconditions: + // * method->service() == GetDescriptor() + // * request and response are of the exact same classes as the objects + // returned by GetRequestPrototype(method) and + // GetResponsePrototype(method). + // * After the call has started, the request must not be modified and the + // response must not be accessed at all until "done" is called. + // * "controller" is of the correct type for the RPC implementation being + // used by this Service. For stubs, the "correct type" depends on the + // RpcChannel which the stub is using. Server-side Service + // implementations are expected to accept whatever type of RpcController + // the server-side RPC implementation uses. + // + // Postconditions: + // * "done" will be called when the method is complete. This may be + // before CallMethod() returns or it may be at some point in the future. + // * If the RPC succeeded, "response" contains the response returned by + // the server. + // * If the RPC failed, "response"'s contents are undefined. The + // RpcController can be queried to determine if an error occurred and + // possibly to get more information about the error. + virtual void FBCallMethod(const MethodDescriptor* method, + google::protobuf::RpcController* controller, const Message* request, + Message* response, google::protobuf::Closure* done) = 0; + + private: + FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS(Service); +}; + +} // namespace flatbuffers +} // namespace brpc + +#endif // BRPC_FLATBUFFERS_COMMON_H_ \ No newline at end of file diff --git a/src/brpc/details/flatbuffers_impl.cpp b/src/brpc/details/flatbuffers_impl.cpp new file mode 100644 index 0000000000..617e14e850 --- /dev/null +++ b/src/brpc/details/flatbuffers_impl.cpp @@ -0,0 +1,244 @@ +#include +#include +#include +#include +#include + +#include "butil/iobuf.h" +#include "butil/logging.h" +#include "butil/thread_local.h" +#include "butil/third_party/murmurhash3/murmurhash3.h" +#include "brpc/details/flatbuffers_impl.h" + +namespace brpc { +namespace flatbuffers { + +#define METHOD_SPLIT " " +#define PREFIX_SPLIT "." +#define DEFAULT_RESERVE_SIZE 64 // Reserve space for rpc header and meta + +uint8_t *SlabAllocator::allocate(size_t size) { + _old_size_param = size; + size_t real_size = size + DEFAULT_RESERVE_SIZE; + + _full_buf_head = (uint8_t *)_iobuf.allocate(real_size); + _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE; + return _fb_begin_head; +} + +uint8_t *SlabAllocator::reallocate_downward(uint8_t *old_p, + size_t old_size, + size_t new_size, + size_t in_use_back, + size_t in_use_front) { + _old_size_param = new_size; + size_t new_real_size = new_size + DEFAULT_RESERVE_SIZE; + + void* old_block = _iobuf.get_cur_block(); + butil::SingleIOBuf::target_block_inc_ref(old_block); + _full_buf_head = (uint8_t *)_iobuf.reallocate_downward(new_real_size, 0, 0); + _fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE; + memcpy_downward(old_p, old_size, _fb_begin_head, new_size, in_use_back, in_use_front); + butil::SingleIOBuf::target_block_dec_ref(old_block); + return _fb_begin_head; +} + +void SlabAllocator::memcpy_downward(uint8_t *old_p, size_t old_size, uint8_t *new_p, + size_t new_size, size_t in_use_back, + size_t in_use_front) { + memcpy(new_p + new_size - in_use_back, old_p + old_size - in_use_back, + in_use_back); + memcpy(new_p, old_p, in_use_front); +} + +int ServiceDescriptor::init(const BrpcDescriptorTable& table) { + if (table.service_name == "" || table.prefix == "" || + table.method_name_list == "") { + errno = EINVAL; + return -1; + } + std::vector res; + std::string strs = table.method_name_list + METHOD_SPLIT; + size_t pos = strs.find(METHOD_SPLIT); + while(pos != strs.npos) { + std::string tmp = strs.substr(0, pos); + if (tmp != METHOD_SPLIT && tmp != "\n") { + res.push_back(tmp); + } + strs = strs.substr(pos + 1, strs.size()); + pos = strs.find(METHOD_SPLIT); + } + method_count_ = res.size(); + if (method_count_ <= 0) { + errno = EINVAL; + return -1; + } + name_ = table.service_name; + full_name_ = table.prefix + std::string(PREFIX_SPLIT) + name_; + butil::MurmurHash3_x86_32(full_name_.c_str(), full_name_.size(), 1, &index_); + methods_ = new MethodDescriptor*[method_count_]; + if (!methods_) { + return -1; + } + memset(methods_, 0, method_count_ * sizeof(MethodDescriptor*)); + for (int i = 0; i < method_count_; ++i) { + methods_[i] = new MethodDescriptor(table.prefix.c_str(), res[i].c_str(), this, i); + if (!methods_[i]) { + release_descriptor(); + return -1; + } + } + return 0; +} + +void ServiceDescriptor::release_descriptor() { + if (methods_ != NULL ) { + for (int i = 0; i < method_count_; ++i) { + if (methods_[i] != NULL) { + delete methods_[i]; + } + methods_[i] = NULL; + } + delete methods_; + methods_ = NULL; + } +} + +ServiceDescriptor::~ServiceDescriptor() { + release_descriptor(); +} + +const MethodDescriptor* ServiceDescriptor::method(int index) const { + if (index < 0 || index >= method_count_) { + errno = EINVAL; + return NULL; + } + if (!methods_) { + errno = EPERM; + return NULL; + } + return methods_[index]; +} + +MethodDescriptor::MethodDescriptor(const char* prefix, + const char* name, + const ServiceDescriptor* service, + int index) { + name_ = name; + full_name_ = std::string(prefix) + std::string(PREFIX_SPLIT) + std::string(name); + service_ = service; + index_ = index; +} + +Message MessageBuilder::ReleaseMessage() { + const uint8_t *msg_data = buf_.data(); // pointer to msg + uint32_t msg_size = static_cast(buf_.size()); + butil::SingleIOBuf& iobuf = slab_allocator_.get_iobuf(); + const uint8_t *buf_data = (const uint8_t *) iobuf.get_begin(); + // Calculate offsets from the buffer start + // reserve the memory space for rpc header and meta + const uint8_t *data_with_rpc = msg_data - DEFAULT_RESERVE_SIZE; + uint32_t new_msg_size = msg_size; + + new_msg_size += DEFAULT_RESERVE_SIZE; + uint32_t begin = data_with_rpc - buf_data; + + const butil::IOBuf::BlockRef& ref = iobuf.get_cur_ref(); + butil::IOBuf::BlockRef sub_ref = {ref.offset + begin, new_msg_size, ref.block}; + Message msg(sub_ref, DEFAULT_RESERVE_SIZE, msg_size); + Reset(); + return msg; +} + +Message::Message(const butil::SingleIOBuf &iobuf, + uint32_t meta_size = 0, + uint32_t msg_size = 0) + : _iobuf(iobuf) + , _meta_size(meta_size) + , _msg_size(msg_size) { + if (msg_size == 0) { + _msg_size = _iobuf.get_length() - meta_size; + } +} + +Message::Message(const butil::IOBuf::BlockRef &ref, + uint32_t meta_size = 0, + uint32_t msg_size = 0) + : _iobuf(ref) + , _meta_size(meta_size) + , _msg_size(msg_size) { + if (msg_size == 0) { + _msg_size = _iobuf.get_length() - meta_size; + } +} + +bool Message::parse_msg_from_iobuf(const butil::IOBuf& buf, size_t msg_size, size_t meta_size) { + size_t buf_size = buf.size(); + if (buf_size != (msg_size + meta_size)) { + return false; + } + if (!_iobuf.assign(buf, buf_size)) { + return false; + } + _meta_size = meta_size; + _msg_size = msg_size; + + return true; +} + +bool Message::append_msg_to_iobuf(butil::IOBuf& buf) { + _iobuf.append_to(&buf); + return true; +} + +void *Message::reduce_meta_size_and_get_buf(uint32_t new_size) { + if (new_size < _meta_size) { + uint32_t off = _meta_size - new_size; + const butil::IOBuf::BlockRef& ref = _iobuf.get_cur_ref(); + butil::IOBuf::BlockRef sub_ref = {ref.offset + off, ref.length - off, ref.block}; + _iobuf = butil::SingleIOBuf(sub_ref); + _meta_size = new_size; + return const_cast(_iobuf.get_begin()); + } else if (new_size == _meta_size) { + return const_cast(_iobuf.get_begin()); + } + LOG(WARNING) << "You should change DEFAULT_RESERVE_SIZE, new_size=" << new_size << " > _meta_size=" << _meta_size; + return NULL; +} + +int parse_service_descriptors(const BrpcDescriptorTable& descriptor_table, + ServiceDescriptor** descriptor_out) { + if(descriptor_out == NULL) { + errno = EINVAL; + return -1; + } + ServiceDescriptor *out = new ServiceDescriptor; + if (!out) { + return -1; + } + int ret = out->init(descriptor_table); + if (ret < 0) { + delete out; + return -1; + } + *descriptor_out = out; + return 0; +} + +bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& buf, size_t meta_remaind) { + if (!msg || msg_size == 0 || meta_remaind > msg_size) { + return false; + } + return msg->parse_msg_from_iobuf(buf, msg_size, meta_remaind); +} + +bool SerializeFbToIOBUF(const Message* msg, butil::IOBuf& buf) { + if (!msg) { + return false; + } + Message* msg_ptr = const_cast(msg); + return msg_ptr->append_msg_to_iobuf(buf); +} + +} // namespace flatbuffers +} // namespace brpc \ No newline at end of file diff --git a/src/brpc/details/flatbuffers_impl.h b/src/brpc/details/flatbuffers_impl.h new file mode 100644 index 0000000000..d1414a8fa2 --- /dev/null +++ b/src/brpc/details/flatbuffers_impl.h @@ -0,0 +1,297 @@ +#ifndef BRPC_FLATBUFFERS_IMPL_H_ +#define BRPC_FLATBUFFERS_IMPL_H_ + +#include +#include +#include +#include +#include "butil/iobuf.h" +#include "butil/single_iobuf.h" +#include "brpc/details/flatbuffers_common.h" + +namespace brpc { +namespace flatbuffers { + +class MessageBuilder; + +// Custom allocator for FlatBuffers that uses IOBuf as underlying storage. +// This allocator manages memory allocation for FlatBuffers messages within +// brpc's zero-copy buffer system, enabling efficient serialization and +// deserialization without unnecessary memory copies. +class SlabAllocator : public ::flatbuffers::Allocator { +public: + SlabAllocator() {} + + SlabAllocator(const SlabAllocator &other) = delete; + + SlabAllocator &operator=(const SlabAllocator &other) = delete; + + SlabAllocator(SlabAllocator &&other) { + // default-construct and swap idiom + swap(other); + } + + SlabAllocator &operator=(SlabAllocator &&other) { + // move-construct and swap idiom + SlabAllocator temp(std::move(other)); + swap(temp); + return *this; + } + + void swap(SlabAllocator &other) { + _iobuf.swap(other._iobuf); + } + + virtual ~SlabAllocator() {} + /* + * Allocate memory from the slab allocator. + * buffer struct: fb header + fb message + */ + virtual uint8_t *allocate(size_t size); + + virtual void deallocate(uint8_t *p, size_t size) override { + if (p == _fb_begin_head) { + _iobuf.deallocate((void*)_full_buf_head); + } + } + + void deallocate(void *p) { + _iobuf.deallocate(p); + } + + virtual uint8_t *reallocate_downward(uint8_t *old_p, size_t old_size, + size_t new_size, size_t in_use_back, + size_t in_use_front); +protected: + void memcpy_downward(uint8_t *old_p, size_t old_size, uint8_t *new_p, + size_t new_size, size_t in_use_back, + size_t in_use_front); +private: + + butil::SingleIOBuf &get_iobuf() { + return _iobuf; + } + uint8_t *_full_buf_head; + uint8_t *_fb_begin_head; + size_t _old_size_param; + butil::SingleIOBuf _iobuf; + friend class MessageBuilder; +}; + +// SlabAllocatorMember is a hack to ensure that the MessageBuilder's +// slab_allocator_ member is constructed before the FlatBufferBuilder, since +// the allocator is used in the FlatBufferBuilder ctor. +struct SlabAllocatorMember { + SlabAllocator slab_allocator_; +}; + +// Represents a FlatBuffers message in brpc's zero-copy buffer system. +// This class wraps a FlatBuffers message stored in an IOBuf +// The message is move-only and cannot be copied. +class Message { +public: + Message() : _meta_size(0), _msg_size(0) {} + + Message &operator=(const Message &other) = delete; +private: + Message(const butil::SingleIOBuf &iobuf, + uint32_t meta_size, + uint32_t msg_size); + + Message(const butil::IOBuf::BlockRef &ref, + uint32_t meta_size, + uint32_t msg_size); + friend class MessageBuilder; +public: + Message(Message &&other) = default; + + Message(const Message &other) = delete; + + Message &operator=(Message &&other) = default; + + void *mutable_data() const { + return (void *)const_cast(data()); + } + + const uint8_t *data() const { + const uint8_t *buf = (const uint8_t *)_iobuf.get_begin(); + return buf + _meta_size; + } + + void *mutable_buf_begin() const { + return const_cast(_iobuf.get_begin()); + } + + void *reduce_meta_size_and_get_buf(uint32_t new_size); + + uint32_t get_meta_size() const { + return _meta_size; + } + + size_t size() const { + return _msg_size; + } + + template + bool Verify() const { + ::flatbuffers::Verifier verifier(data(), size()); + return verifier.VerifyBuffer(nullptr); + } + + template T *GetMutableRoot() { return ::flatbuffers::GetMutableRoot(mutable_data()); } + template const T *GetRoot() const { return ::flatbuffers::GetRoot(data()); } + + bool parse_msg_from_iobuf(const butil::IOBuf& buf, size_t msg_size, size_t meta_size); + + bool append_msg_to_iobuf(butil::IOBuf& buf); + + void Clear() { + _iobuf.reset(); + _meta_size = 0; + _msg_size = 0; + } +private: + butil::SingleIOBuf _iobuf; + uint32_t _meta_size; + uint32_t _msg_size; +}; + +// MessageBuilder is a BRPC-specific FlatBufferBuilder that uses SlabAllocator +// to allocate BRPC buffers. +class MessageBuilder : private SlabAllocatorMember, + public ::flatbuffers::FlatBufferBuilder { +public: + explicit MessageBuilder(::flatbuffers::uoffset_t initial_size = 1024) + : ::flatbuffers::FlatBufferBuilder(initial_size, &slab_allocator_, false) {} + + MessageBuilder(const MessageBuilder &other) = delete; + + MessageBuilder &operator=(const MessageBuilder &other) = delete; + + MessageBuilder(MessageBuilder &&other) + : ::flatbuffers::FlatBufferBuilder(1024, &slab_allocator_, false) { + // Default construct and swap idiom. + Swap(other); + } + + /// Create a MessageBuilder from a FlatBufferBuilder. + explicit MessageBuilder(::flatbuffers::FlatBufferBuilder &&src, + void (*dealloc)(void *) = NULL) + : ::flatbuffers::FlatBufferBuilder(1024, &slab_allocator_, false) { + src.Swap(*this); + src.SwapBufAllocator(*this); + if (buf_.capacity()) { + uint8_t *buf = buf_.scratch_data(); // pointer to memory + size_t capacity = buf_.capacity(); // size of memory + slab_allocator_._iobuf.assign_user_data((void*)buf, capacity, dealloc); + } else { + slab_allocator_._iobuf.reset(); + } + } + + /// Move-assign a FlatBufferBuilder to a MessageBuilder. + /// Only FlatBufferBuilder with default allocator (basically, nullptr) is + /// supported. + MessageBuilder &operator=(::flatbuffers::FlatBufferBuilder &&src) { + // Move construct a temporary and swap + MessageBuilder temp(std::move(src)); + Swap(temp); + return *this; + } + + MessageBuilder &operator=(MessageBuilder &&other) { + // Move construct a temporary and swap + MessageBuilder temp(std::move(other)); + Swap(temp); + return *this; + } + + void Swap(MessageBuilder &other) { + slab_allocator_.swap(other.slab_allocator_); + ::flatbuffers::FlatBufferBuilder::Swap(other); + // After swapping the FlatBufferBuilder, we swap back the allocator, which + // restores the original allocator back in place. This is necessary because + // MessageBuilder's allocator is its own member (SlabAllocatorMember). The + // allocator passed to FlatBufferBuilder::vector_downward must point to this + // member. + buf_.swap_allocator(other.buf_); + } + + ~MessageBuilder() {} + + // GetMessage extracts the subslab of the buffer corresponding to the + // flatbuffers-encoded region and wraps it in a `Message` to handle buffer + // ownership. + // Message GetMessage() = delete; + + Message ReleaseMessage(); +}; + +struct BrpcDescriptorTable { + const std::string prefix; + const std::string service_name; + const std::string method_name_list; // split by blank +}; + +class ServiceDescriptor { +public: + ServiceDescriptor() : name_("") + ,full_name_("") + ,methods_(NULL) + ,method_count_(0) + ,index_(0) {} + ~ServiceDescriptor(); + int init(const BrpcDescriptorTable& table); + // The name of the service, not including its containing scope. + const std::string& name() const {return name_;} + // The fully-qualified name of the service, scope delimited by periods. + const std::string& full_name() const {return full_name_;} + // Index of this service within the file's services array. + uint32_t index() const {return index_;} + // The number of methods this service defines. + int method_count() const {return method_count_;} + // Gets a MethodDescriptor by index, where 0 <= index < method_count(). + // These are returned in the order they were defined in the .proto file. + const MethodDescriptor* method(int index) const; +private: + void release_descriptor(); + std::string name_; + std::string full_name_; + MethodDescriptor** methods_; + int method_count_; + uint32_t index_; + FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS(ServiceDescriptor); +}; + +int parse_service_descriptors(const BrpcDescriptorTable& descriptor_table, + ServiceDescriptor** descriptor_out); + +class MethodDescriptor { +public: + MethodDescriptor(const char* prefix, + const char* name, + const ServiceDescriptor* service, + int index); + // Name of this method, not including containing scope. + const std::string& name() const {return name_;} + // The fully-qualified name of the method, scope delimited by periods. + const std::string& full_name() const {return full_name_;} + // Index within the service's Descriptor. + int index() const {return index_;} + const ServiceDescriptor* service() const { return service_;} +private: + std::string name_; + std::string full_name_; + const ServiceDescriptor* service_; + int index_; + FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS(MethodDescriptor); +}; + +bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& buf, size_t meta_remaind = 0); + +bool SerializeFbToIOBUF(const Message* msg, butil::IOBuf& buf); + +} // namespace flatbuffers +} // namespace brpc + +#endif // BRPC_FLATBUFFERS_IMPL_H_ \ No newline at end of file diff --git a/src/butil/single_iobuf.cpp b/src/butil/single_iobuf.cpp index c51e8fff1d..2a0bede839 100644 --- a/src/butil/single_iobuf.cpp +++ b/src/butil/single_iobuf.cpp @@ -214,8 +214,8 @@ bool SingleIOBuf::assign(const IOBuf& buf, uint32_t msg_size) { } const IOBuf::BlockRef& ref = buf._front_ref(); + reset(); if (ref.length >= msg_size) { - reset(); _cur_ref.offset = ref.offset; _cur_ref.length = msg_size; _cur_ref.block = ref.block; @@ -226,7 +226,6 @@ bool SingleIOBuf::assign(const IOBuf& buf, uint32_t msg_size) { if (!b) { return false; } - reset(); char* out = b->data + b->size; const size_t nref = buf.backing_block_num(); uint32_t last_len = msg_size;