Skip to content

Commit 623e988

Browse files
authored
Make metadata pod lookups more resilient to short lived processes (#2094)
Summary: Make metadata pod lookups more resilient to short lived processes This is a continuation of the work started from #1989. Since the `local_addr` column is populated for client side traces, it can be used as a fallback lookup for these traces. This doesn't solve all of the permutations of missing short lived processes (#1638), but provides more coverage than before. Relevant Issues: #1638 Type of change: /kind bugfix Test Plan: Verified the following - [x] Compared the performance with and without this change with `src/e2e_test/vizier/exectime:exectime`. This change has a minor performance impact, but it closes the gap on certain situations that previously caused users to distrust Pixie's instrumentation ``` # Performance baseline $ ./exectime benchmark -a testing.getcosmic.ai:443 -c <cluster_id> 2>&1 | tee baseline_for_simple_udf_swap_e20880ffd.txt # Performance of this change ./exectime benchmark -a testing.getcosmic.ai:443 -c <cluster_id> 2>&1 | tee simple_udf_swap_cd217c05c.txt ``` [simple_udf_swap_cd217c05c.txt](https://github.com/user-attachments/files/18497709/simple_udf_swap_cd217c05c.txt) [baseline_for_simple_udf_swap_e20880ffd.txt](https://github.com/user-attachments/files/18497710/baseline_for_simple_udf_swap_e20880ffd.txt) - [x] Ran `for i in $(seq 0 1000); do curl http://google.com/$i; sleep 2; done` within a pod and verified that with this change all traces are shown, without this change a significant number of traces are missed. See before and after screenshots below: ![vizier-0 14 14-curl-with-missing-data](https://github.com/user-attachments/assets/035b5dcf-d87a-4134-84c1-9e478594927b) ![traces-with-new-fallback](https://github.com/user-attachments/assets/2a84ecbb-83cb-45ae-af85-77b1773efb59) Changelog Message: Fix a certain class of cases where Pixie previously missed protocol traces from short lived connections --------- Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
1 parent 1f6d18b commit 623e988

File tree

6 files changed

+327
-9
lines changed

6 files changed

+327
-9
lines changed

src/carnot/funcs/metadata/metadata_ops.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ void RegisterMetadataOpsOrDie(px::carnot::udf::Registry* registry) {
4545
registry->RegisterOrDie<HasServiceNameUDF>("has_service_name");
4646
registry->RegisterOrDie<HasValueUDF>("has_value");
4747
registry->RegisterOrDie<IPToPodIDUDF>("ip_to_pod_id");
48+
registry->RegisterOrDie<UPIDtoPodNameLocalAddrFallback>("_upid_to_podname_local_addr_fallback");
4849
registry->RegisterOrDie<IPToPodIDAtTimeUDF>("ip_to_pod_id");
4950
registry->RegisterOrDie<PodIDToPodNameUDF>("pod_id_to_pod_name");
5051
registry->RegisterOrDie<PodIDToPodLabelsUDF>("pod_id_to_pod_labels");

src/carnot/funcs/metadata/metadata_ops.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2977,6 +2977,38 @@ class IPToPodIDUDF : public ScalarUDF {
29772977
static udfspb::UDFSourceExecutor Executor() { return udfspb::UDFSourceExecutor::UDF_KELVIN; }
29782978
};
29792979

2980+
/**
2981+
* This UDF is a compiler internal function. It should only be used on local IP addresses
2982+
* since this function is forced to run on PEMs. In cases where the IP could be a remote address,
2983+
* then it is more correct to have the function run on Kelvin (IPToPodIDUDF or IPToPodIDAtTimeUDF).
2984+
*/
2985+
class UPIDtoPodNameLocalAddrFallback : public ScalarUDF {
2986+
public:
2987+
/**
2988+
* @brief Gets the pod name from UPID or from local addr if first lookup fails
2989+
*/
2990+
StringValue Exec(FunctionContext* ctx, UInt128Value upid_value, StringValue pod_ip,
2991+
Time64NSValue time) {
2992+
auto md = GetMetadataState(ctx);
2993+
auto pod_info = UPIDtoPod(md, upid_value);
2994+
if (pod_info == nullptr) {
2995+
auto pod_id = md->k8s_metadata_state().PodIDByIPAtTime(pod_ip, time.val);
2996+
pod_info = md->k8s_metadata_state().PodInfoByID(pod_id);
2997+
if (pod_info == nullptr) {
2998+
return "";
2999+
}
3000+
}
3001+
return absl::Substitute("$0/$1", pod_info->ns(), pod_info->name());
3002+
}
3003+
3004+
static udfspb::UDFSourceExecutor Executor() { return udfspb::UDFSourceExecutor::UDF_PEM; }
3005+
3006+
static udf::InfRuleVec SemanticInferenceRules() {
3007+
return {udf::ExplicitRule::Create<UPIDtoPodNameLocalAddrFallback>(
3008+
types::ST_POD_NAME, {types::ST_NONE, types::ST_NONE, types::ST_NONE})};
3009+
}
3010+
};
3011+
29803012
class IPToPodIDAtTimeUDF : public ScalarUDF {
29813013
public:
29823014
/**

src/carnot/planner/compiler/analyzer/convert_metadata_rule.cc

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,29 @@ StatusOr<std::string> ConvertMetadataRule::FindKeyColumn(std::shared_ptr<TableTy
7070
absl::StrJoin(parent_type->ColumnNames(), ","));
7171
}
7272

73+
StatusOr<FuncIR*> AddUPIDToPodNameFallback(std::shared_ptr<TableType> parent_type, IRNode* ir_node,
74+
IR* graph) {
75+
if (!parent_type->HasColumn("local_addr") || !parent_type->HasColumn("time_")) {
76+
return error::NotFound("Parent type does not have required columns for fallback conversion.");
77+
}
78+
PX_ASSIGN_OR_RETURN(auto upid_column, graph->CreateNode<ColumnIR>(ir_node->ast(), "upid", 0));
79+
PX_ASSIGN_OR_RETURN(auto local_addr_column,
80+
graph->CreateNode<ColumnIR>(ir_node->ast(), "local_addr", 0));
81+
PX_ASSIGN_OR_RETURN(auto time_column, graph->CreateNode<ColumnIR>(ir_node->ast(), "time_", 0));
82+
return graph->CreateNode<FuncIR>(
83+
ir_node->ast(),
84+
FuncIR::Op{FuncIR::Opcode::non_op, "", "_upid_to_podname_local_addr_fallback"},
85+
std::vector<ExpressionIR*>{upid_column, local_addr_column, time_column});
86+
}
87+
88+
StatusOr<FuncIR*> AddBackupConversions(std::shared_ptr<TableType> parent_type,
89+
std::string func_name, IRNode* ir_node, IR* graph) {
90+
if (absl::StrContains(func_name, "upid_to_pod_name")) {
91+
return AddUPIDToPodNameFallback(parent_type, ir_node, graph);
92+
}
93+
return error::NotFound("No backup conversion function available for $0", func_name);
94+
}
95+
7396
StatusOr<bool> ConvertMetadataRule::Apply(IRNode* ir_node) {
7497
if (!Match(ir_node, Metadata())) {
7598
return false;
@@ -85,17 +108,30 @@ StatusOr<bool> ConvertMetadataRule::Apply(IRNode* ir_node) {
85108
PX_ASSIGN_OR_RETURN(auto parent, metadata->ReferencedOperator());
86109
PX_ASSIGN_OR_RETURN(auto containing_ops, metadata->ContainingOperators());
87110

111+
auto resolved_table_type = parent->resolved_table_type();
88112
PX_ASSIGN_OR_RETURN(std::string key_column_name,
89-
FindKeyColumn(parent->resolved_table_type(), md_property, ir_node));
90-
91-
PX_ASSIGN_OR_RETURN(ColumnIR * key_column,
92-
graph->CreateNode<ColumnIR>(ir_node->ast(), key_column_name, parent_op_idx));
113+
FindKeyColumn(resolved_table_type, md_property, ir_node));
93114

94115
PX_ASSIGN_OR_RETURN(std::string func_name, md_property->UDFName(key_column_name));
95-
PX_ASSIGN_OR_RETURN(
96-
FuncIR * conversion_func,
97-
graph->CreateNode<FuncIR>(ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", func_name},
98-
std::vector<ExpressionIR*>{key_column}));
116+
117+
// TODO(ddelnano): Until the short lived process issue (gh#1638) is resolved, use a
118+
// conversion function that uses local_addr for pod lookups when the upid based default
119+
// (upid_to_pod_name) fails.
120+
auto backup_conversion_func =
121+
AddBackupConversions(resolved_table_type, func_name, ir_node, graph);
122+
FuncIR* conversion_func = nullptr;
123+
if (backup_conversion_func.ok()) {
124+
conversion_func = backup_conversion_func.ValueOrDie();
125+
}
126+
127+
if (conversion_func == nullptr) {
128+
PX_ASSIGN_OR_RETURN(ColumnIR * key_column, graph->CreateNode<ColumnIR>(
129+
ir_node->ast(), key_column_name, parent_op_idx));
130+
PX_ASSIGN_OR_RETURN(
131+
conversion_func,
132+
graph->CreateNode<FuncIR>(ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", func_name},
133+
std::vector<ExpressionIR*>{key_column}));
134+
}
99135
for (int64_t parent_id : graph->dag().ParentsOf(metadata->id())) {
100136
// For each container node of the metadata expression, update it to point to the
101137
// new conversion func instead.

src/carnot/planner/compiler/analyzer/convert_metadata_rule_test.cc

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ using table_store::schema::Relation;
3131

3232
using ConvertMetadataRuleTest = RulesTest;
3333

34-
TEST_F(ConvertMetadataRuleTest, multichild) {
34+
TEST_F(ConvertMetadataRuleTest, multichild_without_fallback_func) {
3535
auto relation = Relation(cpu_relation);
3636
MetadataType conversion_column = MetadataType::UPID;
3737
std::string conversion_column_str = MetadataProperty::GetMetadataString(conversion_column);
@@ -114,6 +114,73 @@ TEST_F(ConvertMetadataRuleTest, missing_conversion_column) {
114114
skip_check_stray_nodes_ = true;
115115
}
116116

117+
TEST_F(ConvertMetadataRuleTest, multichild_with_fallback_func) {
118+
auto relation = Relation(cpu_relation);
119+
MetadataType conversion_column = MetadataType::UPID;
120+
std::string conversion_column_str = MetadataProperty::GetMetadataString(conversion_column);
121+
relation.AddColumn(types::DataType::UINT128, conversion_column_str);
122+
relation.AddColumn(types::DataType::STRING, "local_addr");
123+
relation.AddColumn(types::DataType::TIME64NS, "time_");
124+
compiler_state_->relation_map()->emplace("table", relation);
125+
126+
auto metadata_name = "pod_name";
127+
MetadataProperty* property = md_handler->GetProperty(metadata_name).ValueOrDie();
128+
MetadataIR* metadata_ir = MakeMetadataIR(metadata_name, /* parent_op_idx */ 0);
129+
metadata_ir->set_property(property);
130+
131+
auto src = MakeMemSource(relation);
132+
auto map1 = MakeMap(src, {{"md", metadata_ir}});
133+
auto map2 = MakeMap(src, {{"other_col", MakeInt(2)}, {"md", metadata_ir}});
134+
auto filter = MakeFilter(src, MakeEqualsFunc(metadata_ir, MakeString("pl/foobar")));
135+
136+
ResolveTypesRule type_rule(compiler_state_.get());
137+
ASSERT_OK(type_rule.Execute(graph.get()));
138+
139+
ConvertMetadataRule rule(compiler_state_.get());
140+
auto result = rule.Execute(graph.get());
141+
ASSERT_OK(result);
142+
EXPECT_TRUE(result.ValueOrDie());
143+
144+
EXPECT_EQ(0, graph->FindNodesThatMatch(Metadata()).size());
145+
146+
// Check the contents of the new func.
147+
EXPECT_MATCH(filter->filter_expr(), Equals(Func(), String()));
148+
auto converted_md = static_cast<FuncIR*>(filter->filter_expr())->all_args()[0];
149+
EXPECT_MATCH(converted_md, Func());
150+
auto converted_md_func = static_cast<FuncIR*>(converted_md);
151+
EXPECT_EQ("_upid_to_podname_local_addr_fallback", converted_md_func->func_name());
152+
EXPECT_EQ(3, converted_md_func->all_args().size());
153+
auto upid_col = converted_md_func->all_args()[0];
154+
auto local_addr_col = converted_md_func->all_args()[1];
155+
auto time_col = converted_md_func->all_args()[2];
156+
EXPECT_MATCH(upid_col, ColumnNode("upid"));
157+
EXPECT_MATCH(local_addr_col, ColumnNode("local_addr"));
158+
EXPECT_MATCH(time_col, ColumnNode("time_"));
159+
160+
EXPECT_MATCH(converted_md, ResolvedExpression());
161+
EXPECT_MATCH(upid_col, ResolvedExpression());
162+
EXPECT_MATCH(local_addr_col, ResolvedExpression());
163+
EXPECT_MATCH(time_col, ResolvedExpression());
164+
EXPECT_EQ(types::DataType::STRING, converted_md->EvaluatedDataType());
165+
EXPECT_EQ(types::DataType::UINT128, upid_col->EvaluatedDataType());
166+
EXPECT_EQ(types::DataType::STRING, local_addr_col->EvaluatedDataType());
167+
EXPECT_EQ(types::DataType::TIME64NS, time_col->EvaluatedDataType());
168+
EXPECT_EQ(ExpressionIR::Annotations(MetadataType::POD_NAME), converted_md->annotations());
169+
EXPECT_EQ(1, converted_md_func->func_id());
170+
171+
// Check to make sure that all of the operators and expressions depending on the metadata
172+
// now have an updated reference to the func.
173+
EXPECT_EQ(converted_md, map1->col_exprs()[0].node);
174+
EXPECT_EQ(converted_md, map2->col_exprs()[1].node);
175+
176+
// Check that the semantic type of the conversion func is propagated properly.
177+
auto type_or_s = map2->resolved_table_type()->GetColumnType("md");
178+
ASSERT_OK(type_or_s);
179+
auto type = std::static_pointer_cast<ValueType>(type_or_s.ConsumeValueOrDie());
180+
EXPECT_EQ(types::STRING, type->data_type());
181+
EXPECT_EQ(types::ST_POD_NAME, type->semantic_type());
182+
}
183+
117184
} // namespace compiler
118185
} // namespace planner
119186
} // namespace carnot

src/carnot/planner/logical_planner_test.cc

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,19 @@ class LogicalPlannerTest : public ::testing::Test {
5454
*query_request.mutable_logical_planner_state() = state;
5555
return query_request;
5656
}
57+
plannerpb::QueryRequest MakeQueryRequestWithExecArgs(
58+
const distributedpb::LogicalPlannerState& state, const std::string& query,
59+
const std::vector<std::string>& exec_funcs) {
60+
plannerpb::QueryRequest query_request;
61+
query_request.set_query_str(query);
62+
*query_request.mutable_logical_planner_state() = state;
63+
for (const auto& exec_func : exec_funcs) {
64+
auto f = query_request.add_exec_funcs();
65+
f->set_func_name(exec_func);
66+
f->set_output_table_prefix(exec_func);
67+
}
68+
return query_request;
69+
}
5770
udfspb::UDFInfo info_;
5871
};
5972

@@ -770,6 +783,73 @@ TEST_F(LogicalPlannerTest, filter_pushdown_bug) {
770783
ASSERT_OK(plan->ToProto());
771784
}
772785

786+
const char kPodNameFallbackConversion[] = R"pxl(
787+
import px
788+
789+
df = px.DataFrame(table='http_events', start_time='-6m')
790+
df.pod = df.ctx['pod']
791+
792+
px.display(df)
793+
)pxl";
794+
TEST_F(LogicalPlannerTest, pod_name_fallback_conversion) {
795+
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
796+
auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema);
797+
ASSERT_OK_AND_ASSIGN(auto plan,
798+
planner->Plan(MakeQueryRequest(state, kPodNameFallbackConversion)));
799+
ASSERT_OK(plan->ToProto());
800+
}
801+
802+
const char kPodNameFallbackConversionWithFilter[] = R"pxl(
803+
import px
804+
805+
df = px.DataFrame(table='http_events', start_time='-6m')
806+
df[df.ctx['pod'] != ""]
807+
808+
px.display(df)
809+
)pxl";
810+
TEST_F(LogicalPlannerTest, pod_name_fallback_conversion_with_filter) {
811+
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
812+
auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema);
813+
ASSERT_OK_AND_ASSIGN(
814+
auto plan, planner->Plan(MakeQueryRequest(state, kPodNameFallbackConversionWithFilter)));
815+
ASSERT_OK(plan->ToProto());
816+
}
817+
818+
// Use a data table that doesn't contain local_addr to test df.ctx['pod'] conversion without
819+
// the fallback conversion.
820+
const char kPodNameConversionWithoutFallback[] = R"pxl(
821+
import px
822+
823+
def cql_flow_graph():
824+
df = px.DataFrame('cql_events', start_time='-5m')
825+
df.pod = df.ctx['pod']
826+
827+
df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr))
828+
df.is_ra_pod = df.ra_pod != ''
829+
df.ra_name = px.select(df.is_ra_pod, df.ra_pod, df.remote_addr)
830+
831+
df.is_server_tracing = df.trace_role == 2
832+
833+
df.source = px.select(df.is_server_tracing, df.ra_name, df.pod)
834+
df.destination = px.select(df.is_server_tracing, df.pod, df.ra_name)
835+
836+
return df
837+
838+
839+
def cql_summary_with_links():
840+
df = cql_flow_graph()
841+
842+
return df
843+
)pxl";
844+
TEST_F(LogicalPlannerTest, pod_name_conversion_without_fallback) {
845+
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
846+
auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema);
847+
ASSERT_OK_AND_ASSIGN(auto plan, planner->Plan(MakeQueryRequestWithExecArgs(
848+
state, kPodNameConversionWithoutFallback,
849+
{"cql_flow_graph", "cql_summary_with_links"})));
850+
ASSERT_OK(plan->ToProto());
851+
}
852+
773853
const char kHttpDataScript[] = R"pxl(
774854
import px
775855

0 commit comments

Comments
 (0)