Skip to content
1 change: 1 addition & 0 deletions cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ build/
Testing/
build-support/boost_*
vcpkg_installed/
_deps/

# Build directories created by Clion
cmake-build-*/
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ set(PARQUET_SRCS
platform.cc
printer.cc
properties.cc
row_selection.cc
schema.cc
size_statistics.cc
statistics.cc
Expand Down Expand Up @@ -393,7 +394,8 @@ add_parquet_test(reader-test
level_conversion_test.cc
column_scanner_test.cc
reader_test.cc
stream_reader_test.cc)
stream_reader_test.cc
row_selection_test.cc)

add_parquet_test(writer-test
SOURCES
Expand Down
233 changes: 233 additions & 0 deletions cpp/src/parquet/row_selection.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "parquet/row_selection.h"

#include "arrow/util/bitmap_ops.h"
#include "arrow/util/unreachable.h"
#include "parquet/exception.h"

namespace parquet {

class IteratorImpl : public RowSelection::Iterator {
public:
explicit IteratorImpl(const RowSelection& ranges, size_t batch_size = 1)
: ranges_(ranges.ranges_), index_(0), batch_size_(batch_size) {}

~IteratorImpl() override = default;

::arrow::util::span<const RowSelection::IntervalRange> NextRanges() override {
if (index_ >= ranges_.size()) {
return {};
}
// Return up to batch_size_ ranges
size_t remaining = ranges_.size() - index_;
size_t count = std::min(batch_size_, remaining);
auto result = ::arrow::util::span<const RowSelection::IntervalRange>(
ranges_.data() + index_, count);
index_ += count;
return result;
}

private:
const std::vector<RowSelection::IntervalRange>& ranges_;
size_t index_;
size_t batch_size_;
};

std::unique_ptr<RowSelection::Iterator> RowSelection::NewIterator() const {
return std::make_unique<IteratorImpl>(*this);
}

void RowSelection::Validate() const {
int64_t last_end = -1;
for (const auto& interval : ranges_) {
if (interval.start <= last_end) {
throw ParquetException("Row ranges are not in ascending order");
}
if (interval.length <= 0) {
throw ParquetException("Invalid interval range: length must be positive");
}
last_end = interval.start + interval.length - 1;
}
}

int64_t RowSelection::row_count() const {
int64_t count = 0;
for (const auto& interval : ranges_) {
count += interval.length;
}
return count;
}

RowSelection RowSelection::Intersect(const RowSelection& lhs, const RowSelection& rhs) {
RowSelection result;

// Use iterators to get batches
auto lhs_iter = lhs.NewIterator();
auto rhs_iter = rhs.NewIterator();

auto lhs_batch = lhs_iter->NextRanges();
auto rhs_batch = rhs_iter->NextRanges();
size_t lhs_idx = 0;
size_t rhs_idx = 0;

while (!lhs_batch.empty() && !rhs_batch.empty()) {
// Get current ranges from batches
const auto& left = lhs_batch[lhs_idx];
const auto& right = rhs_batch[rhs_idx];

int64_t left_end = left.start + left.length - 1;
int64_t right_end = right.start + right.length - 1;

// Find overlapping region
int64_t start = std::max(left.start, right.start);
int64_t end = std::min(left_end, right_end);

// If there is an overlap, add it to results
if (start <= end) {
result.ranges_.push_back(IntervalRange{start, end - start + 1});
}

// Advance the index with smaller end
if (left_end < right_end) {
lhs_idx++;
if (lhs_idx >= lhs_batch.size()) {
lhs_batch = lhs_iter->NextRanges();
lhs_idx = 0;
}
} else {
rhs_idx++;
if (rhs_idx >= rhs_batch.size()) {
rhs_batch = rhs_iter->NextRanges();
rhs_idx = 0;
}
}
}

return result;
}

RowSelection RowSelection::Union(const RowSelection& lhs, const RowSelection& rhs) {
RowSelection result;

if (lhs.ranges_.empty()) {
return rhs;
}
if (rhs.ranges_.empty()) {
return lhs;
}

// Use iterators to get batches
auto lhs_iter = lhs.NewIterator();
auto rhs_iter = rhs.NewIterator();

auto lhs_batch = lhs_iter->NextRanges();
auto rhs_batch = rhs_iter->NextRanges();
size_t lhs_idx = 0;
size_t rhs_idx = 0;

// Start with whichever range has the smaller start
IntervalRange current;
if (lhs_batch[0].start <= rhs_batch[0].start) {
current = lhs_batch[lhs_idx++];
if (lhs_idx >= lhs_batch.size()) {
lhs_batch = lhs_iter->NextRanges();
lhs_idx = 0;
}
} else {
current = rhs_batch[rhs_idx++];
if (rhs_idx >= rhs_batch.size()) {
rhs_batch = rhs_iter->NextRanges();
rhs_idx = 0;
}
}

while (!lhs_batch.empty() || !rhs_batch.empty()) {
IntervalRange next;

if (rhs_batch.empty()) {
// Only lhs ranges remain
next = lhs_batch[lhs_idx++];
if (lhs_idx >= lhs_batch.size()) {
lhs_batch = lhs_iter->NextRanges();
lhs_idx = 0;
}
} else if (lhs_batch.empty()) {
// Only rhs ranges remain
next = rhs_batch[rhs_idx++];
if (rhs_idx >= rhs_batch.size()) {
rhs_batch = rhs_iter->NextRanges();
rhs_idx = 0;
}
} else {
// Both have ranges - pick the one with smaller start
const auto& left = lhs_batch[lhs_idx];
const auto& right = rhs_batch[rhs_idx];

if (left.start <= right.start) {
next = left;
lhs_idx++;
if (lhs_idx >= lhs_batch.size()) {
lhs_batch = lhs_iter->NextRanges();
lhs_idx = 0;
}
} else {
next = right;
rhs_idx++;
if (rhs_idx >= rhs_batch.size()) {
rhs_batch = rhs_iter->NextRanges();
rhs_idx = 0;
}
}
}

int64_t current_end = current.start + current.length - 1;
if (current_end + 1 >= next.start) {
// Concatenate overlapping or adjacent ranges
int64_t next_end = next.start + next.length - 1;
int64_t new_end = std::max(current_end, next_end);
current.length = new_end - current.start + 1;
} else {
// Gap between current and next range
result.ranges_.push_back(current);
current = next;
}
}

result.ranges_.push_back(current);
return result;
}

RowSelection RowSelection::MakeSingle(int64_t start, int64_t end) {
RowSelection rowSelection;
rowSelection.ranges_.push_back(IntervalRange{start, end - start + 1});
return rowSelection;
}

RowSelection RowSelection::FromIntervals(::arrow::util::span<const IntervalRange> intervals) {
RowSelection rowSelection;
rowSelection.ranges_.reserve(intervals.size());
rowSelection.ranges_.insert(rowSelection.ranges_.end(), intervals.begin(), intervals.end());
return rowSelection;
}

RowSelection RowSelection::FromIntervals(const std::vector<IntervalRange>& intervals) {
return FromIntervals(::arrow::util::span<const IntervalRange>(intervals));
}

} // namespace parquet
79 changes: 79 additions & 0 deletions cpp/src/parquet/row_selection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>
#include <vector>

#include "arrow/util/span.h"
#include "parquet/platform.h"

namespace parquet {

/// RowSelection is a collection of non-overlapping and ascendingly ordered row ranges.
class PARQUET_EXPORT RowSelection {
public:
/// \brief EXPERIMENTAL: A range of contiguous rows represented by an interval.
struct IntervalRange {
/// Start row of the range (inclusive).
int64_t start;
/// Number of rows in the range.
int64_t length;
};

/// \brief EXPERIMENTAL: An iterator for accessing row ranges in batches.
class Iterator {
public:
virtual ~Iterator() = default;
/// \brief Get the next batch of ranges.
/// Returns an empty span when exhausted.
virtual ::arrow::util::span<const IntervalRange> NextRanges() = 0;
};

/// \brief EXPERIMENTAL: Create a new iterator for accessing row ranges in order.
std::unique_ptr<Iterator> NewIterator() const;

/// \brief EXPERIMENTAL: Validate the row ranges.
/// \throws ParquetException if the row ranges are not in ascending order or
/// overlapped.
void Validate() const;

/// \brief EXPERIMENTAL: Get the total number of rows in the row ranges.
int64_t row_count() const;

/// \brief EXPERIMENTAL: Compute the intersection of two row ranges.
static RowSelection Intersect(const RowSelection& lhs, const RowSelection& rhs);

/// \brief EXPERIMENTAL: Compute the union of two row ranges.
static RowSelection Union(const RowSelection& lhs, const RowSelection& rhs);

/// \brief EXPERIMENTAL: Make a single row range of [start, end].
static RowSelection MakeSingle(int64_t start, int64_t end);

/// \brief EXPERIMENTAL: Make a row range from a list of intervals.
static RowSelection FromIntervals(::arrow::util::span<const IntervalRange> intervals);

/// \brief EXPERIMENTAL: Make a row range from a vector of intervals.
static RowSelection FromIntervals(const std::vector<IntervalRange>& intervals);

private:
friend class IteratorImpl;
std::vector<IntervalRange> ranges_;
};

} // namespace parquet
Loading