Skip to content

Commit 35559cc

Browse files
author
Rafał Hibner
committed
TeeNode preserve order
1 parent dcd07c6 commit 35559cc

File tree

1 file changed

+27
-7
lines changed

1 file changed

+27
-7
lines changed

cpp/src/arrow/dataset/file_base.cc

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "arrow/dataset/file_base.h"
1919

20+
#include "arrow/acero/accumulation_queue.h"
2021
#include "arrow/acero/exec_plan.h"
2122

2223
#include <algorithm>
@@ -555,24 +556,30 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,
555556
auto node,
556557
// to preserve order explicitly sequence the exec batches
557558
// this requires exec batch index to be set upstream (e.g. by SourceNode)
558-
acero::MakeExecNode("consuming_sink", plan, std::move(inputs),
559-
acero::ConsumingSinkNodeOptions{
560-
std::move(consumer),
561-
{},
562-
/*sequence_output=*/write_options.preserve_order}));
559+
acero::MakeExecNode(
560+
"consuming_sink", plan, std::move(inputs),
561+
acero::ConsumingSinkNodeOptions{
562+
std::move(consumer),
563+
{},
564+
/*sequence_output=*/write_node_options.write_options.preserve_order}));
563565

564566
return node;
565567
}
566568

567569
namespace {
568570

569-
class TeeNode : public acero::MapNode {
571+
class TeeNode : public acero::MapNode,
572+
public arrow::acero::util::SerialSequencingQueue::Processor {
570573
public:
571574
TeeNode(acero::ExecPlan* plan, std::vector<acero::ExecNode*> inputs,
572575
std::shared_ptr<Schema> output_schema,
573576
FileSystemDatasetWriteOptions write_options)
574577
: MapNode(plan, std::move(inputs), std::move(output_schema)),
575-
write_options_(std::move(write_options)) {}
578+
write_options_(std::move(write_options)) {
579+
if (write_options.preserve_order) {
580+
sequencer_ = acero::util::SerialSequencingQueue::Make(this);
581+
}
582+
}
576583

577584
Status StartProducing() override {
578585
ARROW_ASSIGN_OR_RAISE(
@@ -602,6 +609,18 @@ class TeeNode : public acero::MapNode {
602609

603610
const char* kind_name() const override { return "TeeNode"; }
604611

612+
Status InputReceived(ExecNode* input, ExecBatch batch) override {
613+
DCHECK_EQ(input, inputs_[0]);
614+
if (sequencer_) {
615+
return sequencer_->InsertBatch(std::move(batch));
616+
}
617+
return Process(std::move(batch));
618+
}
619+
620+
Status Process(ExecBatch batch) override {
621+
return acero::MapNode::InputReceived(inputs_[0], batch);
622+
}
623+
605624
void Finish() override { dataset_writer_->Finish(); }
606625

607626
Result<compute::ExecBatch> ProcessBatch(compute::ExecBatch batch) override {
@@ -635,6 +654,7 @@ class TeeNode : public acero::MapNode {
635654
std::unique_ptr<internal::DatasetWriter> dataset_writer_;
636655
FileSystemDatasetWriteOptions write_options_;
637656
std::atomic<int32_t> backpressure_counter_ = 0;
657+
std::unique_ptr<acero::util::SerialSequencingQueue> sequencer_;
638658
};
639659

640660
} // namespace

0 commit comments

Comments
 (0)