Skip to content

Commit 920baba

Browse files
authored
Add OTel log support to Carnot plan protos and implement exec node support (#2161)
Summary: Add OTel log support to Carnot plan protos and implement exec node support My upcoming Kubecon talk will be demoing functionality that would benefit from OTel export log support. This is the first step in supporting this. The next set of PRs will include the planner and pxl frontend changes to leverage these changes. Relevant Issues: #705 Type of change: /kind feature Test Plan: New tests verify added functionality --------- Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
1 parent eb2722d commit 920baba

28 files changed

+1189
-282
lines changed

bazel/external/opentelemetry.BUILD

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,42 @@ cc_grpc_library(
123123
grpc_only = True,
124124
deps = [":metrics_service_proto_cc"],
125125
)
126+
127+
proto_library(
128+
name = "logs_proto",
129+
srcs = [
130+
"opentelemetry/proto/logs/v1/logs.proto",
131+
],
132+
deps = [
133+
":common_proto",
134+
":resource_proto",
135+
],
136+
)
137+
138+
cc_proto_library(
139+
name = "logs_proto_cc",
140+
deps = [":logs_proto"],
141+
)
142+
143+
proto_library(
144+
name = "logs_service_proto",
145+
srcs = [
146+
"opentelemetry/proto/collector/logs/v1/logs_service.proto",
147+
],
148+
deps = [
149+
":logs_proto",
150+
],
151+
)
152+
153+
cc_proto_library(
154+
name = "logs_service_proto_cc",
155+
deps = [":logs_service_proto"],
156+
)
157+
158+
cc_grpc_library(
159+
name = "logs_service_grpc_cc",
160+
srcs = [":logs_service_proto"],
161+
generate_mocks = True,
162+
grpc_only = True,
163+
deps = [":logs_service_proto_cc"],
164+
)

src/carnot/engine_state.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ class EngineState : public NotCopyable {
8787
[this](const std::string& remote_addr, bool insecure) {
8888
return TraceStubGenerator(remote_addr, insecure);
8989
},
90+
[this](const std::string& remote_addr, bool insecure) {
91+
return LogsStubGenerator(remote_addr, insecure);
92+
},
9093
query_id, model_pool_.get(), grpc_router_, add_auth_to_grpc_context_func_, metrics_.get());
9194
}
9295
std::shared_ptr<grpc::Channel> CreateChannel(const std::string& remote_addr, bool insecure) {
@@ -115,6 +118,12 @@ class EngineState : public NotCopyable {
115118
CreateChannel(remote_addr, insecure));
116119
}
117120

121+
std::unique_ptr<opentelemetry::proto::collector::logs::v1::LogsService::StubInterface>
122+
LogsStubGenerator(const std::string& remote_addr, bool insecure) {
123+
return opentelemetry::proto::collector::logs::v1::LogsService::NewStub(
124+
CreateChannel(remote_addr, insecure));
125+
}
126+
118127
std::unique_ptr<plan::PlanState> CreatePlanState() {
119128
return std::make_unique<plan::PlanState>(func_registry_.get());
120129
}

src/carnot/exec/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pl_cc_library(
4747
"//src/table_store/table:cc_library",
4848
"@com_github_apache_arrow//:arrow",
4949
"@com_github_grpc_grpc//:grpc++",
50+
"@com_github_opentelemetry_proto//:logs_service_grpc_cc",
5051
"@com_github_opentelemetry_proto//:metrics_service_grpc_cc",
5152
"@com_github_opentelemetry_proto//:trace_service_grpc_cc",
5253
"@com_github_rlyeh_sole//:sole",

src/carnot/exec/agg_node_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ std::unique_ptr<ExecState> MakeTestExecState(udf::Registry* registry) {
476476
auto table_store = std::make_shared<table_store::TableStore>();
477477
return std::make_unique<ExecState>(registry, table_store, MockResultSinkStubGenerator,
478478
MockMetricsStubGenerator, MockTraceStubGenerator,
479-
sole::uuid4(), nullptr);
479+
MockLogStubGenerator, sole::uuid4(), nullptr);
480480
}
481481

482482
std::unique_ptr<plan::Operator> PlanNodeFromPbtxt(const std::string& pbtxt) {

src/carnot/exec/empty_source_node_test.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ class EmptySourceNodeTest : public ::testing::Test {
4848
void SetUp() override {
4949
func_registry_ = std::make_unique<udf::Registry>("test_registry");
5050
auto table_store = std::make_shared<table_store::TableStore>();
51-
exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
52-
MockResultSinkStubGenerator, MockMetricsStubGenerator,
53-
MockTraceStubGenerator, sole::uuid4(), nullptr);
51+
exec_state_ = std::make_unique<ExecState>(
52+
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
53+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
5454
}
5555

5656
std::unique_ptr<ExecState> exec_state_;

src/carnot/exec/equijoin_node_test.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ class JoinNodeTest : public ::testing::Test {
5252
JoinNodeTest() {
5353
func_registry_ = std::make_unique<udf::Registry>("test_registry");
5454
auto table_store = std::make_shared<table_store::TableStore>();
55-
exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
56-
MockResultSinkStubGenerator, MockMetricsStubGenerator,
57-
MockTraceStubGenerator, sole::uuid4(), nullptr);
55+
exec_state_ = std::make_unique<ExecState>(
56+
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
57+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
5858
}
5959

6060
protected:

src/carnot/exec/exec_graph_test.cc

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ class BaseExecGraphTest : public ::testing::Test {
7474
func_registry_->RegisterOrDie<MultiplyUDF>("multiply");
7575

7676
auto table_store = std::make_shared<table_store::TableStore>();
77-
exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
78-
MockResultSinkStubGenerator, MockMetricsStubGenerator,
79-
MockTraceStubGenerator, sole::uuid4(), nullptr);
77+
exec_state_ = std::make_unique<ExecState>(
78+
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
79+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
8080
}
8181

8282
std::unique_ptr<udf::Registry> func_registry_;
@@ -174,7 +174,7 @@ TEST_P(ExecGraphExecuteTest, execute) {
174174
table_store->AddTable("numbers", table);
175175
auto exec_state_ = std::make_unique<ExecState>(
176176
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
177-
MockTraceStubGenerator, sole::uuid4(), nullptr);
177+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
178178

179179
EXPECT_OK(exec_state_->AddScalarUDF(
180180
0, "add", std::vector<types::DataType>({types::DataType::INT64, types::DataType::FLOAT64})));
@@ -255,7 +255,7 @@ TEST_F(ExecGraphTest, execute_time) {
255255

256256
auto exec_state_ = std::make_unique<ExecState>(
257257
func_registry.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
258-
MockTraceStubGenerator, sole::uuid4(), nullptr);
258+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
259259

260260
EXPECT_OK(exec_state_->AddScalarUDF(
261261
0, "add", std::vector<types::DataType>({types::DataType::INT64, types::DataType::FLOAT64})));
@@ -322,7 +322,7 @@ TEST_F(ExecGraphTest, two_limits_dont_interfere) {
322322
table_store->AddTable("numbers", table);
323323
auto exec_state_ = std::make_unique<ExecState>(
324324
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
325-
MockTraceStubGenerator, sole::uuid4(), nullptr);
325+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
326326

327327
ExecutionGraph e;
328328
auto s = e.Init(schema.get(), plan_state.get(), exec_state_.get(), plan_fragment_.get(),
@@ -390,7 +390,7 @@ TEST_F(ExecGraphTest, limit_w_multiple_srcs) {
390390
table_store->AddTable("numbers", table);
391391
auto exec_state_ = std::make_unique<ExecState>(
392392
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
393-
MockTraceStubGenerator, sole::uuid4(), nullptr);
393+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
394394

395395
ExecutionGraph e;
396396
auto s = e.Init(schema.get(), plan_state.get(), exec_state_.get(), plan_fragment_.get(),
@@ -452,7 +452,7 @@ TEST_F(ExecGraphTest, two_sequential_limits) {
452452
table_store->AddTable("numbers", table);
453453
auto exec_state_ = std::make_unique<ExecState>(
454454
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
455-
MockTraceStubGenerator, sole::uuid4(), nullptr);
455+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
456456

457457
ExecutionGraph e;
458458
auto s = e.Init(schema.get(), plan_state.get(), exec_state_.get(), plan_fragment_.get(),
@@ -515,7 +515,7 @@ TEST_F(ExecGraphTest, execute_with_two_limits) {
515515
table_store->AddTable("numbers", table);
516516
auto exec_state_ = std::make_unique<ExecState>(
517517
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
518-
MockTraceStubGenerator, sole::uuid4(), nullptr);
518+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
519519

520520
ExecutionGraph e;
521521
auto s = e.Init(schema.get(), plan_state.get(), exec_state_.get(), plan_fragment_.get(),
@@ -702,7 +702,7 @@ class GRPCExecGraphTest : public ::testing::Test {
702702
auto table_store = std::make_shared<table_store::TableStore>();
703703
exec_state_ = std::make_unique<ExecState>(
704704
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
705-
MockTraceStubGenerator, sole::uuid4(), nullptr, grpc_router_.get());
705+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr, grpc_router_.get());
706706
}
707707

708708
void SetUpPlanFragment() {

src/carnot/exec/exec_state.h

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "src/shared/metadata/metadata_state.h"
3838
#include "src/table_store/table/table_store.h"
3939

40+
#include "opentelemetry/proto/collector/logs/v1/logs_service.grpc.pb.h"
4041
#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h"
4142
#include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h"
4243
#include "src/carnot/carnotpb/carnot.grpc.pb.h"
@@ -54,6 +55,9 @@ using MetricsStubGenerator = std::function<
5455
using TraceStubGenerator = std::function<
5556
std::unique_ptr<opentelemetry::proto::collector::trace::v1::TraceService::StubInterface>(
5657
const std::string& address, bool insecure)>;
58+
using LogsStubGenerator = std::function<
59+
std::unique_ptr<opentelemetry::proto::collector::logs::v1::LogsService::StubInterface>(
60+
const std::string& address, bool insecure)>;
5761

5862
/**
5963
* ExecState manages the execution state for a single query. A new one will
@@ -69,15 +73,16 @@ class ExecState {
6973
udf::Registry* func_registry, std::shared_ptr<table_store::TableStore> table_store,
7074
const ResultSinkStubGenerator& stub_generator,
7175
const MetricsStubGenerator& metrics_stub_generator,
72-
const TraceStubGenerator& trace_stub_generator, const sole::uuid& query_id,
73-
udf::ModelPool* model_pool, GRPCRouter* grpc_router = nullptr,
76+
const TraceStubGenerator& trace_stub_generator, const LogsStubGenerator& logs_stub_generator,
77+
const sole::uuid& query_id, udf::ModelPool* model_pool, GRPCRouter* grpc_router = nullptr,
7478
std::function<void(grpc::ClientContext*)> add_auth_func = [](grpc::ClientContext*) {},
7579
ExecMetrics* exec_metrics = nullptr)
7680
: func_registry_(func_registry),
7781
table_store_(std::move(table_store)),
7882
stub_generator_(stub_generator),
7983
metrics_stub_generator_(metrics_stub_generator),
8084
trace_stub_generator_(trace_stub_generator),
85+
logs_stub_generator_(logs_stub_generator),
8186
query_id_(query_id),
8287
model_pool_(model_pool),
8388
grpc_router_(grpc_router),
@@ -157,6 +162,19 @@ class ExecState {
157162
trace_service_stubs_pool_.push_back(std::move(stub_));
158163
return raw;
159164
}
165+
opentelemetry::proto::collector::logs::v1::LogsService::StubInterface* LogsServiceStub(
166+
const std::string& remote_address, bool insecure) {
167+
if (logs_service_stub_map_.contains(remote_address)) {
168+
return logs_service_stub_map_[remote_address];
169+
}
170+
std::unique_ptr<opentelemetry::proto::collector::logs::v1::LogsService::StubInterface> stub_ =
171+
logs_stub_generator_(remote_address, insecure);
172+
opentelemetry::proto::collector::logs::v1::LogsService::StubInterface* raw = stub_.get();
173+
logs_service_stub_map_[remote_address] = raw;
174+
// Push to the pool.
175+
logs_service_stubs_pool_.push_back(std::move(stub_));
176+
return raw;
177+
}
160178

161179
udf::ScalarUDFDefinition* GetScalarUDFDefinition(int64_t id) { return id_to_scalar_udf_map_[id]; }
162180

@@ -209,6 +227,7 @@ class ExecState {
209227
const ResultSinkStubGenerator stub_generator_;
210228
const MetricsStubGenerator metrics_stub_generator_;
211229
const TraceStubGenerator trace_stub_generator_;
230+
const LogsStubGenerator logs_stub_generator_;
212231
std::map<int64_t, udf::ScalarUDFDefinition*> id_to_scalar_udf_map_;
213232
std::map<int64_t, udf::UDADefinition*> id_to_uda_map_;
214233
const sole::uuid query_id_;
@@ -239,6 +258,15 @@ class ExecState {
239258
absl::flat_hash_map<std::string,
240259
opentelemetry::proto::collector::trace::v1::TraceService::StubInterface*>
241260
trace_service_stub_map_;
261+
262+
std::vector<
263+
std::unique_ptr<opentelemetry::proto::collector::logs::v1::LogsService::StubInterface>>
264+
logs_service_stubs_pool_;
265+
absl::flat_hash_map<std::string,
266+
opentelemetry::proto::collector::logs::v1::LogsService::StubInterface*>
267+
logs_service_stub_map_;
268+
269+
types::Time64NSValue time_now_;
242270
};
243271

244272
} // namespace exec

src/carnot/exec/expression_evaluator_benchmark.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
using ScalarExpression = px::carnot::plan::ScalarExpression;
4747
using ScalarExpressionVector = std::vector<std::shared_ptr<ScalarExpression>>;
4848
using px::carnot::exec::ExecState;
49+
using px::carnot::exec::MockLogStubGenerator;
4950
using px::carnot::exec::MockMetricsStubGenerator;
5051
using px::carnot::exec::MockResultSinkStubGenerator;
5152
using px::carnot::exec::MockTraceStubGenerator;
@@ -85,7 +86,7 @@ void BM_ScalarExpressionTwoCols(benchmark::State& state,
8586
PX_CHECK_OK(func_registry->Register<AddUDF>("add"));
8687
auto exec_state = std::make_unique<ExecState>(
8788
func_registry.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
88-
MockTraceStubGenerator, sole::uuid4(), nullptr);
89+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
8990
EXPECT_OK(exec_state->AddScalarUDF(
9091
0, "add",
9192
std::vector<px::types::DataType>({px::types::DataType::INT64, px::types::DataType::INT64})));

src/carnot/exec/expression_evaluator_test.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ class ScalarExpressionTest : public ::testing::TestWithParam<ScalarExpressionEva
116116

117117
EXPECT_TRUE(func_registry_->Register<AddUDF>("add").ok());
118118
EXPECT_TRUE(func_registry_->Register<InitArgUDF>("init_arg").ok());
119-
exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
120-
MockResultSinkStubGenerator, MockMetricsStubGenerator,
121-
MockTraceStubGenerator, sole::uuid4(), nullptr);
119+
exec_state_ = std::make_unique<ExecState>(
120+
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
121+
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
122122
EXPECT_OK(exec_state_->AddScalarUDF(
123123
0, "add", std::vector<types::DataType>({types::DataType::INT64, types::DataType::INT64})));
124124
EXPECT_OK(

0 commit comments

Comments
 (0)