Skip to content

Commit eec70f4

Browse files
author
Rafał Hibner
committed
Check schema for matched pieps
fix init & typo
1 parent 9436af3 commit eec70f4

File tree

2 files changed

+8
-15
lines changed

2 files changed

+8
-15
lines changed

cpp/src/arrow/acero/pipe_node.cc

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,10 @@ class PipeSinkNode : public ExecNode {
141141
return pipe_->InputFinished(total_batches);
142142
}
143143

144-
Status Init() override { return pipe_->Init(inputs_[0]->output_schema()); }
144+
Status Init() override {
145+
ARROW_RETURN_NOT_OK(pipe_->Init(inputs_[0]->output_schema()));
146+
return ExecNode::Init();
147+
}
145148

146149
Status StartProducing() override { return Status::OK(); }
147150

@@ -318,24 +321,14 @@ void Pipe::addSource(PipeSource* source) {
318321
Status Pipe::Init(const std::shared_ptr<Schema> schema) {
319322
for (auto node : plan_->nodes()) {
320323
if (node->kind_name() == PipeSourceNode::kKindName) {
321-
if (!schema->Equals(node->output_schema())) {
322-
return Status::Invalid("Pipe sechma does not match for " + pipe_name_);
323-
}
324324
PipeSourceNode* pipe_source = checked_cast<PipeSourceNode*>(node);
325325
if (pipe_source->pipe_name_ == pipe_name_) {
326+
if (!schema->Equals(node->output_schema())) {
327+
return Status::Invalid("Pipe schema does not match for " + pipe_name_);
328+
}
326329
addSource(pipe_source);
327-
// std::cout << std::string(node->kind_name()) + ":! " + node->label()
328-
// << std::endl;
329330
}
330-
// else {
331-
// std::cout << std::string(node->kind_name()) + ":+ " + node->label()
332-
// << std::endl;
333-
// }
334331
}
335-
// else {
336-
// std::cout << std::string(node->kind_name()) + ":- " + node->label() <<
337-
// std::endl;
338-
// }
339332
}
340333
return Status::OK();
341334
}

cpp/src/arrow/acero/pipe_node_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ TEST(ExecPlanExecution, PipeErrorSink) {
123123
plan->StartProducing();
124124
ASSERT_THAT(plan->finished().result().status(),
125125
Raises(StatusCode::Invalid,
126-
HasSubstr("Pipe sechma does not match for named_pipe_1")));
126+
HasSubstr("Pipe schema does not match for named_pipe_1")));
127127

128128
Declaration dup3 = Declaration::Sequence(
129129
{{"pipe_source",

0 commit comments

Comments
 (0)