Skip to content

Commit b4fe200

Browse files
author
Rafał Hibner
committed
RecordBatchReader allow implicit ordering
1 parent f8b20f1 commit b4fe200

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

cpp/src/arrow/acero/options.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,11 @@ class ARROW_ACERO_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOpt
198198
public:
199199
/// Create an instance from values
200200
RecordBatchReaderSourceNodeOptions(std::shared_ptr<RecordBatchReader> reader,
201-
arrow::internal::Executor* io_executor = NULLPTR)
202-
: reader(std::move(reader)), io_executor(io_executor) {}
201+
arrow::internal::Executor* io_executor = NULLPTR,
202+
bool implicit_ordering = false)
203+
: reader(std::move(reader)),
204+
io_executor(io_executor),
205+
implicit_ordering(implicit_ordering) {}
203206

204207
/// \brief The RecordBatchReader which acts as the data source
205208
std::shared_ptr<RecordBatchReader> reader;
@@ -208,6 +211,8 @@ class ARROW_ACERO_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOpt
208211
///
209212
/// Defaults to the default I/O executor.
210213
arrow::internal::Executor* io_executor;
214+
215+
bool implicit_ordering{false};
211216
};
212217

213218
/// a source node that reads from an iterator of array vectors

cpp/src/arrow/acero/source_node.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,15 +407,18 @@ struct SchemaSourceNode : public SourceNode {
407407

408408
struct RecordBatchReaderSourceNode : public SourceNode {
409409
RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
410-
arrow::AsyncGenerator<std::optional<ExecBatch>> generator)
411-
: SourceNode(plan, schema, generator) {}
410+
arrow::AsyncGenerator<std::optional<ExecBatch>> generator,
411+
Ordering ordering = Ordering::Unordered())
412+
: SourceNode(plan, schema, generator, ordering) {}
412413

413414
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
414415
const ExecNodeOptions& options) {
415416
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, kKindName));
416417
const auto& cast_options =
417418
checked_cast<const RecordBatchReaderSourceNodeOptions&>(options);
418419
auto& reader = cast_options.reader;
420+
Ordering ordering =
421+
cast_options.implicit_ordering ? Ordering::Implicit() : Ordering::Unordered();
419422
auto io_executor = cast_options.io_executor;
420423

421424
if (reader == nullptr) {
@@ -428,7 +431,7 @@ struct RecordBatchReaderSourceNode : public SourceNode {
428431

429432
ARROW_ASSIGN_OR_RAISE(auto generator, MakeGenerator(reader, io_executor));
430433
return plan->EmplaceNode<RecordBatchReaderSourceNode>(plan, reader->schema(),
431-
generator);
434+
generator, ordering);
432435
}
433436

434437
static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(

0 commit comments

Comments
 (0)