Skip to content

Commit a50e12d

Browse files
author
Rafał Hibner
committed
RecordBatchReader allow implicit ordering
1 parent 5074b06 commit a50e12d

File tree

2 files changed

+12
-5
lines changed

2 files changed

+12
-5
lines changed

cpp/src/arrow/acero/options.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,16 +200,21 @@ class ARROW_ACERO_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOpt
200200
public:
201201
/// Create an instance from values
202202
RecordBatchReaderSourceNodeOptions(std::shared_ptr<RecordBatchReader> reader,
203-
arrow::internal::Executor* io_executor = NULLPTR)
204-
: reader(std::move(reader)), io_executor(io_executor) {}
203+
arrow::internal::Executor* io_executor = NULLPTR,
204+
bool implicit_ordering=false)
205+
: reader(std::move(reader)), io_executor(io_executor),implicit_ordering(implicit_ordering) {}
205206

206207
/// \brief The RecordBatchReader which acts as the data source
207208
std::shared_ptr<RecordBatchReader> reader;
208209

210+
209211
/// \brief The executor to use for the reader
210212
///
211213
/// Defaults to the default I/O executor.
212214
arrow::internal::Executor* io_executor;
215+
216+
217+
bool implicit_ordering{false};
213218
};
214219

215220
/// a source node that reads from an iterator of array vectors

cpp/src/arrow/acero/source_node.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,15 +406,17 @@ struct SchemaSourceNode : public SourceNode {
406406

407407
struct RecordBatchReaderSourceNode : public SourceNode {
408408
RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
409-
arrow::AsyncGenerator<std::optional<ExecBatch>> generator)
410-
: SourceNode(plan, schema, generator) {}
409+
arrow::AsyncGenerator<std::optional<ExecBatch>> generator,
410+
Ordering ordering = Ordering::Unordered())
411+
: SourceNode(plan, schema, generator, ordering) {}
411412

412413
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
413414
const ExecNodeOptions& options) {
414415
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, kKindName));
415416
const auto& cast_options =
416417
checked_cast<const RecordBatchReaderSourceNodeOptions&>(options);
417418
auto& reader = cast_options.reader;
419+
Ordering ordering = cast_options.implicit_ordering?Ordering::Implicit():Ordering::Unordered();
418420
auto io_executor = cast_options.io_executor;
419421

420422
if (reader == nullptr) {
@@ -427,7 +429,7 @@ struct RecordBatchReaderSourceNode : public SourceNode {
427429

428430
ARROW_ASSIGN_OR_RAISE(auto generator, MakeGenerator(reader, io_executor));
429431
return plan->EmplaceNode<RecordBatchReaderSourceNode>(plan, reader->schema(),
430-
generator);
432+
generator, ordering);
431433
}
432434

433435
static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(

0 commit comments

Comments
 (0)