-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-48467: [C++][Parquet] Add configure to limit the row group size #48468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -395,15 +395,24 @@ class FileWriterImpl : public FileWriter { | |
| RETURN_NOT_OK(CheckClosed()); | ||
| RETURN_NOT_OK(table.Validate()); | ||
|
|
||
| if (chunk_size <= 0 && table.num_rows() > 0) { | ||
| return Status::Invalid("chunk size per row_group must be greater than 0"); | ||
| } else if (!table.schema()->Equals(*schema_, false)) { | ||
| if (!table.schema()->Equals(*schema_, false)) { | ||
| return Status::Invalid("table schema does not match this writer's. table:'", | ||
| table.schema()->ToString(), "' this:'", schema_->ToString(), | ||
| "'"); | ||
| } else if (chunk_size > this->properties().max_row_group_length()) { | ||
| chunk_size = this->properties().max_row_group_length(); | ||
| } | ||
| // max_row_group_bytes is applied only after the row group has accumulated data. | ||
| if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) { | ||
| double avg_row_size = | ||
| row_group_writer_->total_buffered_bytes() * 1.0 / row_group_writer_->num_rows(); | ||
| chunk_size = std::min( | ||
| chunk_size, | ||
| static_cast<int64_t>(this->properties().max_row_group_bytes() / avg_row_size)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will there be rows written in int64_t buffered_bytes = row_group_writer_->current_buffered_bytes();
double avg_row_bytes = buffered_bytes * 1.0 / group_rows;
chunk_size = std::min(
chunk_size,
static_cast<int64_t>((this->properties().max_row_group_bytes() - buffered_bytes) / avg_row_bytes));
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually each batch will be written to a new row group, we just use the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get it. |
||
| } | ||
| if (chunk_size <= 0 && table.num_rows() > 0) { | ||
| return Status::Invalid("rows per row_group must be greater than 0"); | ||
| } | ||
|
|
||
| auto WriteRowGroup = [&](int64_t offset, int64_t size) { | ||
| RETURN_NOT_OK(NewRowGroup()); | ||
|
|
@@ -442,12 +451,8 @@ class FileWriterImpl : public FileWriter { | |
| return Status::OK(); | ||
| } | ||
|
|
||
| // Max number of rows allowed in a row group. | ||
| const int64_t max_row_group_length = this->properties().max_row_group_length(); | ||
|
|
||
| // Initialize a new buffered row group writer if necessary. | ||
| if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || | ||
| row_group_writer_->num_rows() >= max_row_group_length) { | ||
| if (row_group_writer_ == nullptr || !row_group_writer_->buffered()) { | ||
| RETURN_NOT_OK(NewBufferedRowGroup()); | ||
| } | ||
|
|
||
|
|
@@ -480,17 +485,28 @@ class FileWriterImpl : public FileWriter { | |
| return Status::OK(); | ||
| }; | ||
|
|
||
| // Max number of rows allowed in a row group. | ||
| const int64_t max_row_group_length = this->properties().max_row_group_length(); | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure whether we should validate that such config value is positive.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should validate this in the properties.h when it is being set? |
||
| // Max number of bytes allowed in a row group. | ||
| const int64_t max_row_group_bytes = this->properties().max_row_group_bytes(); | ||
|
|
||
| int64_t offset = 0; | ||
| while (offset < batch.num_rows()) { | ||
| const int64_t batch_size = | ||
| std::min(max_row_group_length - row_group_writer_->num_rows(), | ||
| batch.num_rows() - offset); | ||
| RETURN_NOT_OK(WriteBatch(offset, batch_size)); | ||
| offset += batch_size; | ||
|
|
||
| // Flush current row group writer and create a new writer if it is full. | ||
| if (row_group_writer_->num_rows() >= max_row_group_length && | ||
| offset < batch.num_rows()) { | ||
| int64_t group_rows = row_group_writer_->num_rows(); | ||
| int64_t batch_size = | ||
| std::min(max_row_group_length - group_rows, batch.num_rows() - offset); | ||
| if (group_rows > 0) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to my comment above, should we consider all written row groups as well to estimate the average row size?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we change to use all written row groups, then the first row group size can only be determined by |
||
| int64_t buffered_bytes = row_group_writer_->total_buffered_bytes(); | ||
| double avg_row_size = buffered_bytes * 1.0 / group_rows; | ||
| batch_size = std::min( | ||
| batch_size, | ||
| static_cast<int64_t>((max_row_group_bytes - buffered_bytes) / avg_row_size)); | ||
| } | ||
| if (batch_size > 0) { | ||
| RETURN_NOT_OK(WriteBatch(offset, batch_size)); | ||
| offset += batch_size; | ||
| } else if (offset < batch.num_rows()) { | ||
| // Current row group is full, write remaining rows in a new group. | ||
| RETURN_NOT_OK(NewBufferedRowGroup()); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |
| #include "arrow/io/caching.h" | ||
| #include "arrow/type_fwd.h" | ||
| #include "arrow/util/compression.h" | ||
| #include "arrow/util/logging.h" | ||
| #include "arrow/util/type_fwd.h" | ||
| #include "parquet/encryption/encryption.h" | ||
| #include "parquet/exception.h" | ||
|
|
@@ -160,6 +161,7 @@ static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; | |
| static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize; | ||
| static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; | ||
| static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024; | ||
| static constexpr int64_t DEFAULT_MAX_ROW_GROUP_BYTES = 128 * 1024 * 1024; | ||
| static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true; | ||
| static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096; | ||
| static constexpr Encoding::type DEFAULT_ENCODING = Encoding::UNKNOWN; | ||
|
|
@@ -293,6 +295,7 @@ class PARQUET_EXPORT WriterProperties { | |
| dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT), | ||
| write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), | ||
| max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH), | ||
| max_row_group_bytes_(DEFAULT_MAX_ROW_GROUP_BYTES), | ||
| pagesize_(kDefaultDataPageSize), | ||
| max_rows_per_page_(kDefaultMaxRowsPerPage), | ||
| version_(ParquetVersion::PARQUET_2_6), | ||
|
|
@@ -309,6 +312,7 @@ class PARQUET_EXPORT WriterProperties { | |
| dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()), | ||
| write_batch_size_(properties.write_batch_size()), | ||
| max_row_group_length_(properties.max_row_group_length()), | ||
| max_row_group_bytes_(properties.max_row_group_bytes()), | ||
| pagesize_(properties.data_pagesize()), | ||
| max_rows_per_page_(properties.max_rows_per_page()), | ||
| version_(properties.version()), | ||
|
|
@@ -414,10 +418,20 @@ class PARQUET_EXPORT WriterProperties { | |
| /// Specify the max number of rows to put in a single row group. | ||
| /// Default 1Mi rows. | ||
| Builder* max_row_group_length(int64_t max_row_group_length) { | ||
| ARROW_CHECK_GT(max_row_group_length, 0) << "max_row_group_length must be positive"; | ||
| max_row_group_length_ = max_row_group_length; | ||
| return this; | ||
| } | ||
|
|
||
| /// Specify the max number of bytes to put in a single row group. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The size is after encoding and compression, right? It would be good to document this. |
||
| /// The size is estimated based on encoded and compressed data. | ||
| /// Default 128MB. | ||
| Builder* max_row_group_bytes(int64_t max_row_group_bytes) { | ||
| ARROW_CHECK_GT(max_row_group_bytes, 0) << "max_row_group_bytes must be positive"; | ||
| max_row_group_bytes_ = max_row_group_bytes; | ||
| return this; | ||
| } | ||
|
|
||
| /// Specify the data page size. | ||
| /// Default 1MB. | ||
| Builder* data_pagesize(int64_t pg_size) { | ||
|
|
@@ -779,11 +793,12 @@ class PARQUET_EXPORT WriterProperties { | |
|
|
||
| return std::shared_ptr<WriterProperties>(new WriterProperties( | ||
| pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, | ||
| pagesize_, max_rows_per_page_, version_, created_by_, page_checksum_enabled_, | ||
| size_statistics_level_, std::move(file_encryption_properties_), | ||
| default_column_properties_, column_properties, data_page_version_, | ||
| store_decimal_as_integer_, std::move(sorting_columns_), | ||
| content_defined_chunking_enabled_, content_defined_chunking_options_)); | ||
| max_row_group_bytes_, pagesize_, max_rows_per_page_, version_, created_by_, | ||
| page_checksum_enabled_, size_statistics_level_, | ||
| std::move(file_encryption_properties_), default_column_properties_, | ||
| column_properties, data_page_version_, store_decimal_as_integer_, | ||
| std::move(sorting_columns_), content_defined_chunking_enabled_, | ||
| content_defined_chunking_options_)); | ||
| } | ||
|
|
||
| private: | ||
|
|
@@ -793,6 +808,7 @@ class PARQUET_EXPORT WriterProperties { | |
| int64_t dictionary_pagesize_limit_; | ||
| int64_t write_batch_size_; | ||
| int64_t max_row_group_length_; | ||
| int64_t max_row_group_bytes_; | ||
| int64_t pagesize_; | ||
| int64_t max_rows_per_page_; | ||
| ParquetVersion::type version_; | ||
|
|
@@ -828,6 +844,8 @@ class PARQUET_EXPORT WriterProperties { | |
|
|
||
| inline int64_t max_row_group_length() const { return max_row_group_length_; } | ||
|
|
||
| inline int64_t max_row_group_bytes() const { return max_row_group_bytes_; } | ||
|
|
||
| inline int64_t data_pagesize() const { return pagesize_; } | ||
|
|
||
| inline int64_t max_rows_per_page() const { return max_rows_per_page_; } | ||
|
|
@@ -946,9 +964,10 @@ class PARQUET_EXPORT WriterProperties { | |
| private: | ||
| explicit WriterProperties( | ||
| MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, | ||
| int64_t max_row_group_length, int64_t pagesize, int64_t max_rows_per_page, | ||
| ParquetVersion::type version, const std::string& created_by, | ||
| bool page_write_checksum_enabled, SizeStatisticsLevel size_statistics_level, | ||
| int64_t max_row_group_length, int64_t max_row_group_bytes, int64_t pagesize, | ||
| int64_t max_rows_per_page, ParquetVersion::type version, | ||
| const std::string& created_by, bool page_write_checksum_enabled, | ||
| SizeStatisticsLevel size_statistics_level, | ||
| std::shared_ptr<FileEncryptionProperties> file_encryption_properties, | ||
| const ColumnProperties& default_column_properties, | ||
| const std::unordered_map<std::string, ColumnProperties>& column_properties, | ||
|
|
@@ -959,6 +978,7 @@ class PARQUET_EXPORT WriterProperties { | |
| dictionary_pagesize_limit_(dictionary_pagesize_limit), | ||
| write_batch_size_(write_batch_size), | ||
| max_row_group_length_(max_row_group_length), | ||
| max_row_group_bytes_(max_row_group_bytes), | ||
| pagesize_(pagesize), | ||
| max_rows_per_page_(max_rows_per_page), | ||
| parquet_data_page_version_(data_page_version), | ||
|
|
@@ -978,6 +998,7 @@ class PARQUET_EXPORT WriterProperties { | |
| int64_t dictionary_pagesize_limit_; | ||
| int64_t write_batch_size_; | ||
| int64_t max_row_group_length_; | ||
| int64_t max_row_group_bytes_; | ||
| int64_t pagesize_; | ||
| int64_t max_rows_per_page_; | ||
| ParquetDataPageVersion parquet_data_page_version_; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
row_group_writer_->num_rows() > 0can only happen when the current row group writer is in the buffered mode. Usually users callingWriteTablewill never use buffered mode so this approach seems not working in the majority of cases.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead, can we gather this information from all written row groups (if available)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wgtmac If user use the static
WriteTablefunction, the arrowFileWriteris always recreated and we can not gather the old written row groups.arrow/cpp/src/parquet/arrow/writer.cc
Lines 591 to 601 in 8040f2a
If user use the internal
WriteTablefunction, we can getavg_row_bytesby lastrow_group_writer_or gathering all previous row group writers.arrow/cpp/src/parquet/arrow/writer.cc
Line 394 in 8040f2a