Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 63 additions & 27 deletions src/brpc/builtin/rpcz_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,43 @@ static void PrintElapse(std::ostream& os, int64_t cur_time,

static void PrintAnnotations(
std::ostream& os, int64_t cur_time, int64_t* last_time,
SpanInfoExtractor** extractors, int num_extr) {
SpanInfoExtractor** extractors, int num_extr, const RpczSpan* span) {
int64_t anno_time;
std::string a;
const char* span_type_str = "Span";
if (span) {
switch (span->type()) {
case SPAN_TYPE_SERVER:
span_type_str = "ServerSpan";
break;
case SPAN_TYPE_CLIENT:
span_type_str = "ClientSpan";
break;
case SPAN_TYPE_BTHREAD:
span_type_str = "BthreadSpan";
break;
}
}

// TODO: Going through all extractors is not strictly correct because
// later extractors may have earlier annotations.
for (int i = 0; i < num_extr; ++i) {
while (extractors[i]->PopAnnotation(cur_time, &anno_time, &a)) {
PrintRealTime(os, anno_time);
PrintElapse(os, anno_time, last_time);
os << ' ' << WebEscape(a);
os << ' ';
if (span) {
const char* short_type = "SPAN";
if (span->type() == SPAN_TYPE_SERVER) {
short_type = "Server";
} else if (span->type() == SPAN_TYPE_CLIENT) {
short_type = "Client";
} else if (span->type() == SPAN_TYPE_BTHREAD) {
short_type = "Bthread";
}
os << '[' << short_type << " SPAN#" << Hex(span->span_id()) << "] ";
}
os << WebEscape(a);
if (a.empty() || butil::back_char(a) != '\n') {
os << '\n';
}
Expand All @@ -204,12 +231,12 @@ static void PrintAnnotations(

static bool PrintAnnotationsAndRealTimeSpan(
std::ostream& os, int64_t cur_time, int64_t* last_time,
SpanInfoExtractor** extr, int num_extr) {
SpanInfoExtractor** extr, int num_extr, const RpczSpan* span) {
if (cur_time == 0) {
// the field was not set.
return false;
}
PrintAnnotations(os, cur_time, last_time, extr, num_extr);
PrintAnnotations(os, cur_time, last_time, extr, num_extr, span);
PrintRealTime(os, cur_time);
PrintElapse(os, cur_time, last_time);
return true;
Expand Down Expand Up @@ -239,9 +266,10 @@ static void PrintClientSpan(
extr[num_extr++] = server_extr;
}
extr[num_extr++] = &client_extr;
// start_send_us is always set for client spans.
CHECK(PrintAnnotationsAndRealTimeSpan(os, span.start_send_real_us(),
last_time, extr, num_extr));
if (!PrintAnnotationsAndRealTimeSpan(os, span.start_send_real_us(),
last_time, extr, num_extr, &span)) {
os << " start_send_real_us:not-set";
}
const Protocol* protocol = FindProtocol(span.protocol());
const char* protocol_name = (protocol ? protocol->name : "Unknown");
const butil::EndPoint remote_side(butil::int2ip(span.remote_ip()), span.remote_port());
Expand Down Expand Up @@ -271,12 +299,12 @@ static void PrintClientSpan(
os << std::endl;

if (PrintAnnotationsAndRealTimeSpan(os, span.sent_real_us(),
last_time, extr, num_extr)) {
os << " Requested(" << span.request_size() << ") [1]" << std::endl;
last_time, extr, num_extr, &span)) {
os << " [Client SPAN#" << Hex(span.span_id()) << "] Requested(" << span.request_size() << ") [1]" << std::endl;
}
if (PrintAnnotationsAndRealTimeSpan(os, span.received_real_us(),
last_time, extr, num_extr)) {
os << " Received response(" << span.response_size() << ")";
last_time, extr, num_extr, &span)) {
os << " [Client SPAN#" << Hex(span.span_id()) << "] Received response(" << span.response_size() << ")";
if (span.base_cid() != 0 && span.ending_cid() != 0) {
int64_t ver = span.ending_cid() - span.base_cid();
if (ver >= 1) {
Expand All @@ -289,18 +317,18 @@ static void PrintClientSpan(
}

if (PrintAnnotationsAndRealTimeSpan(os, span.start_parse_real_us(),
last_time, extr, num_extr)) {
os << " Processing the response in a new bthread" << std::endl;
last_time, extr, num_extr, &span)) {
os << " [Client SPAN#" << Hex(span.span_id()) << "] Processing the response in a new bthread" << std::endl;
}

if (PrintAnnotationsAndRealTimeSpan(
os, span.start_callback_real_us(),
last_time, extr, num_extr)) {
os << (span.async() ? " Enter user's done" : " Back to user's callsite") << std::endl;
last_time, extr, num_extr, &span)) {
os << " [Client SPAN#" << Hex(span.span_id()) << "] " << (span.async() ? " Enter user's done" : " Back to user's callsite") << std::endl;
}

PrintAnnotations(os, std::numeric_limits<int64_t>::max(),
last_time, extr, num_extr);
last_time, extr, num_extr, &span);
}

static void PrintClientSpan(std::ostream& os,const RpczSpan& span,
Expand All @@ -318,7 +346,15 @@ static void PrintBthreadSpan(std::ostream& os, const RpczSpan& span, int64_t* la
extr[num_extr++] = server_extr;
}
extr[num_extr++] = &client_extr;
PrintAnnotations(os, std::numeric_limits<int64_t>::max(), last_time, extr, num_extr);

// Print span id for bthread span context identification
os << " [Bthread SPAN#" << Hex(span.span_id());
if (span.parent_span_id() != 0) {
os << " parent#" << Hex(span.parent_span_id());
}
os << "] ";

PrintAnnotations(os, std::numeric_limits<int64_t>::max(), last_time, extr, num_extr, &span);
}

static void PrintServerSpan(std::ostream& os, const RpczSpan& span,
Expand Down Expand Up @@ -348,16 +384,16 @@ static void PrintServerSpan(std::ostream& os, const RpczSpan& span,
os << std::endl;
if (PrintAnnotationsAndRealTimeSpan(
os, span.start_parse_real_us(),
&last_time, extr, ARRAY_SIZE(extr))) {
os << " Processing the request in a new bthread" << std::endl;
&last_time, extr, ARRAY_SIZE(extr), &span)) {
os << " [Server SPAN#" << Hex(span.span_id()) << "] Processing the request in a new bthread" << std::endl;
}

bool entered_user_method = false;
if (PrintAnnotationsAndRealTimeSpan(
os, span.start_callback_real_us(),
&last_time, extr, ARRAY_SIZE(extr))) {
&last_time, extr, ARRAY_SIZE(extr), &span)) {
entered_user_method = true;
os << " Enter " << WebEscape(span.full_method_name()) << std::endl;
os << " [Server SPAN#" << Hex(span.span_id()) << "] Enter " << WebEscape(span.full_method_name()) << std::endl;
}

const int nclient = span.client_spans_size();
Expand All @@ -372,22 +408,22 @@ static void PrintServerSpan(std::ostream& os, const RpczSpan& span,

if (PrintAnnotationsAndRealTimeSpan(
os, span.start_send_real_us(),
&last_time, extr, ARRAY_SIZE(extr))) {
&last_time, extr, ARRAY_SIZE(extr), &span)) {
if (entered_user_method) {
os << " Leave " << WebEscape(span.full_method_name()) << std::endl;
os << " [Server SPAN#" << Hex(span.span_id()) << "] Leave " << WebEscape(span.full_method_name()) << std::endl;
} else {
os << " Responding" << std::endl;
os << " [Server SPAN#" << Hex(span.span_id()) << "] Responding" << std::endl;
}
}

if (PrintAnnotationsAndRealTimeSpan(
os, span.sent_real_us(),
&last_time, extr, ARRAY_SIZE(extr))) {
os << " Responded(" << span.response_size() << ')' << std::endl;
&last_time, extr, ARRAY_SIZE(extr), &span)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please post some of the display information?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As shown below, the type of span (server span, client span, bthread span) is indicated at the beginning to facilitate troubleshooting.

Fetching new trace TRACE#B from HOST#X

Received request REQ#65674 from HOST#Y via PROTO#1 log=0 trace=TRACE#B span=SPAN#S1

[ServerSpan span=SPAN#S1] Processing the request in a new worker
[ServerSpan span=SPAN#S1] ParseStats[cut=?, queue=?, worker=?, rpc=?]
[ServerSpan span=SPAN#S1] Enter Ecom.OrderService.PlaceOrder

[ClientSpan span=SPAN#C1] Cart.AddItem item=ITEM# qty=QTY# price=PRICE# user=USER# session=SESSION#
[ClientSpan span=SPAN#C1] Cart.VerifyOperation item=ITEM# op=OP# user=USER# session=SESSION#
[ClientSpan span=SPAN#C1] Cart.ProposeUpdate item=ITEM# op=OP# user=USER# session=SESSION# attempt=SN#
[ClientSpan span=SPAN#C1] Orchestration.Manager: enqueue step=PlaceOrder first=I# last=I#
[ClientSpan span=SPAN#C1] Orchestration.Queue: pending step

Requesting Ecom.Consensus.Append@HOST#Z PROTO#1 call=CALL# trace=TRACE#B span=SPAN#C1
[ClientSpan span=SPAN#C1] Requested(REQ#65778) [1]
[ClientSpan span=SPAN#C1] Received response(RSP#) of request[1]
[ClientSpan span=SPAN#C1] Processing the response in a new worker
[ClientSpan span=SPAN#C1] Enter client callback

[ClientSpan span=SPAN#C1] Workflow.OnApply, ongoing tasks: 0
[ClientSpan span=SPAN#C1] TaskQueue status running=R# queues=Q# active=A#
[ClientSpan span=SPAN#C1] OrderPipeline.WriteOrder
[ClientSpan span=SPAN#C1] Payment.Authorize
[ClientSpan span=SPAN#C1] Inventory.Reserve
[ClientSpan span=SPAN#C1] Promotion.Apply
[ClientSpan span=SPAN#C1] Promotion.Apply_done
[ClientSpan span=SPAN#C1] Notification.Prepare
[ClientSpan span=SPAN#C1] Notification.Prepare_done
[ClientSpan span=SPAN#C1] Inventory.Reserve_done
[ClientSpan span=SPAN#C1] Payment.Authorize_done
[ClientSpan span=SPAN#C1] OrderPipeline.Sync
[ClientSpan span=SPAN#C1] OrderPipeline.WriteOrder done (~2.7ms)
[ClientSpan span=SPAN#C1] Latency stats: avg≈58us p90≈66us p99≈160us
[ClientSpan span=SPAN#C1] Protocol response before join
[ClientSpan span=SPAN#C1] Protocol response after join

[ClientSpan span=SPAN#C1] FulfillmentWorker: ongoing tasks=3
[ClientSpan span=SPAN#C1] FulfillmentWorker: global queue running=R# queues=Q# active=A#
[ClientSpan span=SPAN#C1] FulfillmentWorker: enter
[ClientSpan span=SPAN#C1] Batcher.append
[ClientSpan span=SPAN#C1] Batcher.flush queue_wait≈~6.7ms

Requesting Ecom.Consensus.Append@HOST#Y PROTO#1 call=CALL# trace=TRACE#B span=SPAN#C2
[ClientSpan span=SPAN#C2] Requested(REQ#65778) [1]
[ClientSpan span=SPAN#C2] Received response(RSP#) of request[1]
[ClientSpan span=SPAN#C2] Processing the response in a new worker
[ClientSpan span=SPAN#C2] Enter client callback

[ServerSpan span=SPAN#S1] Leave Ecom.OrderService.PlaceOrder
[ServerSpan span=SPAN#S1] Responded

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This format is a little bit tedious. How about [Server SPAN#S1] Processing the request in a new worker?

os << " [Server SPAN#" << Hex(span.span_id()) << "] Responded(" << span.response_size() << ')' << std::endl;
}

PrintAnnotations(os, std::numeric_limits<int64_t>::max(),
&last_time, extr, ARRAY_SIZE(extr));
&last_time, extr, ARRAY_SIZE(extr), &span);
}

class RpczSpanFilter : public SpanFilter {
Expand Down
22 changes: 12 additions & 10 deletions src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
#include "brpc/rdma/rdma_helper.h"
#include "brpc/policy/esp_authenticator.h"
#include "brpc/details/controller_private_accessor.h"

namespace brpc {

Expand Down Expand Up @@ -490,7 +491,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
}
cntl->set_used_by_rpc();

if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) {
if (cntl->_sender == NULL && IsTraceable(Span::tls_parent().get())) {
const int64_t start_send_us = butil::cpuwide_time_us();
const std::string* method_name = NULL;
if (_get_method_name) {
Expand All @@ -501,13 +502,16 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
const static std::string NULL_METHOD_STR = "null-method";
method_name = &NULL_METHOD_STR;
}
Span* span = Span::CreateClientSpan(
std::shared_ptr<Span> span = Span::CreateClientSpan(
*method_name, start_send_real_us - start_send_us);
span->set_log_id(cntl->log_id());
span->set_base_cid(correlation_id);
span->set_protocol(_options.protocol);
span->set_start_send_us(start_send_us);
cntl->_span = span;
if (span) {
ControllerPrivateAccessor accessor(cntl);
span->set_log_id(cntl->log_id());
span->set_base_cid(correlation_id);
span->set_protocol(_options.protocol);
span->set_start_send_us(start_send_us);
accessor.set_span(span);
}
}
// Override some options if they haven't been set by Controller
if (cntl->timeout_ms() == UNSET_MAGIC_NUM) {
Expand Down Expand Up @@ -608,9 +612,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
// be woken up by callback when RPC finishes (succeeds or still
// fails after retry)
Join(correlation_id);
if (cntl->_span) {
cntl->SubmitSpan();
}
cntl->SubmitSpan();
cntl->OnRPCEnd(butil::gettimeofday_us());
}
}
Expand Down
Loading