Skip to content

Commit b2932ee

Browse files
author
Rafał Hibner
committed
Initial implementation od acero pipes
1 parent 31747f0 commit b2932ee

File tree

8 files changed

+826
-0
lines changed

8 files changed

+826
-0
lines changed

cpp/src/arrow/acero/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ set(ARROW_ACERO_SRCS
4848
time_series_util.cc
4949
tpch_node.cc
5050
union_node.cc
51+
pipe_node.cc
5152
util.cc)
5253

5354
append_runtime_avx2_src(ARROW_ACERO_SRCS bloom_filter_avx2.cc)
@@ -167,6 +168,7 @@ add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc)
167168

168169
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc)
169170
add_arrow_acero_test(sorted_merge_node_test SOURCES sorted_merge_node_test.cc)
171+
add_arrow_acero_test(pipe_node_test SOURCES pipe_node_test.cc)
170172

171173
add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
172174
add_arrow_acero_test(union_node_test SOURCES union_node_test.cc)

cpp/src/arrow/acero/exec_plan.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,6 +1115,7 @@ void RegisterSinkNode(ExecFactoryRegistry*);
11151115
void RegisterHashJoinNode(ExecFactoryRegistry*);
11161116
void RegisterAsofJoinNode(ExecFactoryRegistry*);
11171117
void RegisterSortedMergeNode(ExecFactoryRegistry*);
1118+
void RegisterPipeNodes(ExecFactoryRegistry* registry);
11181119

11191120
} // namespace internal
11201121

@@ -1134,6 +1135,7 @@ ExecFactoryRegistry* default_exec_factory_registry() {
11341135
internal::RegisterHashJoinNode(this);
11351136
internal::RegisterAsofJoinNode(this);
11361137
internal::RegisterSortedMergeNode(this);
1138+
internal::RegisterPipeNodes(this);
11371139
}
11381140

11391141
Result<Factory> GetFactory(const std::string& factory_name) override {

cpp/src/arrow/acero/options.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <string>
2424
#include <vector>
2525

26+
#include "arrow/acero/exec_plan.h"
2627
#include "arrow/acero/type_fwd.h"
2728
#include "arrow/acero/visibility.h"
2829
#include "arrow/compute/api_aggregate.h"
@@ -865,6 +866,30 @@ class ARROW_ACERO_EXPORT PivotLongerNodeOptions : public ExecNodeOptions {
865866
std::vector<std::string> measurement_field_names;
866867
};
867868

869+
/// \brief a node which implements experimental node that enables multiple sink exec nodes
870+
///
871+
/// Note, this API is experimental and will change in the future
872+
///
873+
/// This node forwards each exec batch to its output and also provides number of
874+
/// additional source nodes for additional acero pipelines.
875+
class ARROW_ACERO_EXPORT PipeSourceNodeOptions : public ExecNodeOptions {
876+
public:
877+
PipeSourceNodeOptions(std::string pipe_name, std::shared_ptr<Schema> output_schema)
878+
: pipe_name(std::move(pipe_name)), output_schema(std::move(output_schema)) {}
879+
880+
/// \brief List of declarations that will receive duplicated ExecBatches
881+
std::string pipe_name;
882+
std::shared_ptr<Schema> output_schema;
883+
};
884+
885+
class ARROW_ACERO_EXPORT PipeSinkNodeOptions : public ExecNodeOptions {
886+
public:
887+
PipeSinkNodeOptions(std::string pipe_name) : pipe_name(std::move(pipe_name)) {}
888+
889+
/// \brief List of declarations that will receive duplicated ExecBatches
890+
std::string pipe_name;
891+
};
892+
868893
/// @}
869894

870895
} // namespace acero

0 commit comments

Comments
 (0)