diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 0831fb62675..962c9c0e30b 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -378,19 +378,19 @@ const double test_traits<::arrow::DoubleType>::value(4.2); template <> struct test_traits<::arrow::StringType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static std::string const value; + static const std::string value; }; template <> struct test_traits<::arrow::BinaryType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static std::string const value; + static const std::string value; }; template <> struct test_traits<::arrow::FixedSizeBinaryType> { static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY; - static std::string const value; + static const std::string value; }; const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT @@ -5794,6 +5794,72 @@ TEST(TestArrowReadWrite, WriteRecordBatchNotProduceEmptyRowGroup) { } } +TEST(TestArrowReadWrite, WriteRecordBatchFlushRowGroupByBufferedSize) { + auto pool = ::arrow::default_memory_pool(); + auto sink = CreateOutputStream(); + // Limit the max bytes in a row group to 10 so that each batch produces a new group. + auto writer_properties = WriterProperties::Builder().max_row_group_bytes(10)->build(); + auto arrow_writer_properties = default_arrow_writer_properties(); + + // Prepare schema + auto schema = ::arrow::schema({::arrow::field("a", ::arrow::int64())}); + std::shared_ptr parquet_schema; + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + + auto gen = ::arrow::random::RandomArrayGenerator(/*seed=*/42); + + // Create writer to write data via RecordBatch. + ASSERT_OK_AND_ASSIGN(auto arrow_writer, parquet::arrow::FileWriter::Open( + *schema, pool, sink, writer_properties, + arrow_writer_properties)); + // NewBufferedRowGroup() is not called explicitly and it will be called + // inside WriteRecordBatch(). + for (int i = 0; i < 5; ++i) { + auto record_batch = + gen.BatchOf({::arrow::field("a", ::arrow::int64())}, /*length=*/1); + ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch)); + } + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + auto file_metadata = arrow_writer->metadata(); + EXPECT_EQ(5, file_metadata->num_row_groups()); + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(1, file_metadata->RowGroup(i)->num_rows()); + } +} + +TEST(TestArrowReadWrite, WriteTableFlushRowGroupByBufferedSize) { + auto pool = ::arrow::default_memory_pool(); + auto sink = CreateOutputStream(); + // Limit the max bytes in a row group to 100, then first table generates one row group, + // and second table generates 5 row groups. + auto writer_properties = WriterProperties::Builder().max_row_group_bytes(100)->build(); + auto arrow_writer_properties = default_arrow_writer_properties(); + + // Prepare schema + auto schema = ::arrow::schema({::arrow::field("a", ::arrow::int64())}); + auto table = ::arrow::Table::Make( + schema, {::arrow::ArrayFromJSON(::arrow::int64(), R"([1, 2, 3, 4, 5])")}); + ASSERT_OK_AND_ASSIGN(auto arrow_writer, parquet::arrow::FileWriter::Open( + *schema, pool, sink, writer_properties, + arrow_writer_properties)); + for (int i = 0; i < 2; ++i) { + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table, 5)); + } + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + auto file_metadata = arrow_writer->metadata(); + EXPECT_EQ(6, file_metadata->num_row_groups()); + EXPECT_EQ(5, file_metadata->RowGroup(0)->num_rows()); + for (int i = 1; i < 6; ++i) { + EXPECT_EQ(1, file_metadata->RowGroup(i)->num_rows()); + } +} + TEST(TestArrowReadWrite, MultithreadedWrite) { const int num_columns = 20; const int num_rows = 1000; diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 4b2b06e5e09..c85628a285e 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -395,15 +395,24 @@ class FileWriterImpl : public FileWriter { RETURN_NOT_OK(CheckClosed()); RETURN_NOT_OK(table.Validate()); - if (chunk_size <= 0 && table.num_rows() > 0) { - return Status::Invalid("chunk size per row_group must be greater than 0"); - } else if (!table.schema()->Equals(*schema_, false)) { + if (!table.schema()->Equals(*schema_, false)) { return Status::Invalid("table schema does not match this writer's. table:'", table.schema()->ToString(), "' this:'", schema_->ToString(), "'"); } else if (chunk_size > this->properties().max_row_group_length()) { chunk_size = this->properties().max_row_group_length(); } + // max_row_group_bytes is applied only after the row group has accumulated data. + if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) { + double avg_row_size = + row_group_writer_->total_buffered_bytes() * 1.0 / row_group_writer_->num_rows(); + chunk_size = std::min( + chunk_size, + static_cast(this->properties().max_row_group_bytes() / avg_row_size)); + } + if (chunk_size <= 0 && table.num_rows() > 0) { + return Status::Invalid("rows per row_group must be greater than 0"); + } auto WriteRowGroup = [&](int64_t offset, int64_t size) { RETURN_NOT_OK(NewRowGroup()); @@ -442,12 +451,8 @@ class FileWriterImpl : public FileWriter { return Status::OK(); } - // Max number of rows allowed in a row group. - const int64_t max_row_group_length = this->properties().max_row_group_length(); - // Initialize a new buffered row group writer if necessary. - if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || - row_group_writer_->num_rows() >= max_row_group_length) { + if (row_group_writer_ == nullptr || !row_group_writer_->buffered()) { RETURN_NOT_OK(NewBufferedRowGroup()); } @@ -480,17 +485,28 @@ class FileWriterImpl : public FileWriter { return Status::OK(); }; + // Max number of rows allowed in a row group. + const int64_t max_row_group_length = this->properties().max_row_group_length(); + // Max number of bytes allowed in a row group. + const int64_t max_row_group_bytes = this->properties().max_row_group_bytes(); + int64_t offset = 0; while (offset < batch.num_rows()) { - const int64_t batch_size = - std::min(max_row_group_length - row_group_writer_->num_rows(), - batch.num_rows() - offset); - RETURN_NOT_OK(WriteBatch(offset, batch_size)); - offset += batch_size; - - // Flush current row group writer and create a new writer if it is full. - if (row_group_writer_->num_rows() >= max_row_group_length && - offset < batch.num_rows()) { + int64_t group_rows = row_group_writer_->num_rows(); + int64_t batch_size = + std::min(max_row_group_length - group_rows, batch.num_rows() - offset); + if (group_rows > 0) { + int64_t buffered_bytes = row_group_writer_->total_buffered_bytes(); + double avg_row_size = buffered_bytes * 1.0 / group_rows; + batch_size = std::min( + batch_size, + static_cast((max_row_group_bytes - buffered_bytes) / avg_row_size)); + } + if (batch_size > 0) { + RETURN_NOT_OK(WriteBatch(offset, batch_size)); + offset += batch_size; + } else if (offset < batch.num_rows()) { + // Current row group is full, write remaining rows in a new group. RETURN_NOT_OK(NewBufferedRowGroup()); } } diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 8ec8796ffd1..9b1491ecf90 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -111,9 +111,9 @@ class PARQUET_EXPORT FileWriter { /// Multiple RecordBatches can be written into the same row group /// through this method. /// - /// WriterProperties.max_row_group_length() is respected and a new - /// row group will be created if the current row group exceeds the - /// limit. + /// WriterProperties.max_row_group_length() and WriterProperties.max_row_group_bytes() + /// are respected and a new row group will be created if the current row group exceeds + /// the limits. /// /// Batches get flushed to the output stream once NewBufferedRowGroup() /// or Close() is called. diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index ddec2c0a560..a24c8eddb8d 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -68,6 +68,12 @@ int64_t RowGroupWriter::total_compressed_bytes_written() const { return contents_->total_compressed_bytes_written(); } +int64_t RowGroupWriter::total_buffered_bytes() const { + return contents_->total_compressed_bytes() + + contents_->total_compressed_bytes_written() + + contents_->EstimatedBufferedValueBytes(); +} + bool RowGroupWriter::buffered() const { return contents_->buffered(); } int RowGroupWriter::current_column() { return contents_->current_column(); } @@ -195,6 +201,20 @@ class RowGroupSerializer : public RowGroupWriter::Contents { return total_compressed_bytes_written; } + int64_t EstimatedBufferedValueBytes() const override { + if (closed_) { + return 0; + } + int64_t estimated_buffered_value_bytes = 0; + for (size_t i = 0; i < column_writers_.size(); i++) { + if (column_writers_[i]) { + estimated_buffered_value_bytes += + column_writers_[i]->estimated_buffered_value_bytes(); + } + } + return estimated_buffered_value_bytes; + } + bool buffered() const override { return buffered_row_group_; } void Close() override { diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index d5ea1d7c98a..872d5946b10 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -58,6 +58,8 @@ class PARQUET_EXPORT RowGroupWriter { virtual int64_t total_compressed_bytes() const = 0; /// \brief total compressed bytes written by the page writer virtual int64_t total_compressed_bytes_written() const = 0; + /// \brief estimated size of the values that are not written to a page yet + virtual int64_t EstimatedBufferedValueBytes() const = 0; virtual bool buffered() const = 0; }; @@ -99,6 +101,9 @@ class PARQUET_EXPORT RowGroupWriter { int64_t total_compressed_bytes() const; /// \brief total compressed bytes written by the page writer int64_t total_compressed_bytes_written() const; + /// \brief including compressed bytes in page writer and uncompressed data + /// value buffer. + int64_t total_buffered_bytes() const; /// Returns whether the current RowGroupWriter is in the buffered mode and is created /// by calling ParquetFileWriter::AppendBufferedRowGroup. diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index eb5aee29695..f9d221ab099 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -26,6 +26,7 @@ #include "arrow/io/caching.h" #include "arrow/type_fwd.h" #include "arrow/util/compression.h" +#include "arrow/util/logging.h" #include "arrow/util/type_fwd.h" #include "parquet/encryption/encryption.h" #include "parquet/exception.h" @@ -160,6 +161,7 @@ static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize; static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024; +static constexpr int64_t DEFAULT_MAX_ROW_GROUP_BYTES = 128 * 1024 * 1024; static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true; static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096; static constexpr Encoding::type DEFAULT_ENCODING = Encoding::UNKNOWN; @@ -293,6 +295,7 @@ class PARQUET_EXPORT WriterProperties { dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT), write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH), + max_row_group_bytes_(DEFAULT_MAX_ROW_GROUP_BYTES), pagesize_(kDefaultDataPageSize), max_rows_per_page_(kDefaultMaxRowsPerPage), version_(ParquetVersion::PARQUET_2_6), @@ -309,6 +312,7 @@ class PARQUET_EXPORT WriterProperties { dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()), write_batch_size_(properties.write_batch_size()), max_row_group_length_(properties.max_row_group_length()), + max_row_group_bytes_(properties.max_row_group_bytes()), pagesize_(properties.data_pagesize()), max_rows_per_page_(properties.max_rows_per_page()), version_(properties.version()), @@ -414,10 +418,20 @@ class PARQUET_EXPORT WriterProperties { /// Specify the max number of rows to put in a single row group. /// Default 1Mi rows. Builder* max_row_group_length(int64_t max_row_group_length) { + ARROW_CHECK_GT(max_row_group_length, 0) << "max_row_group_length must be positive"; max_row_group_length_ = max_row_group_length; return this; } + /// Specify the max number of bytes to put in a single row group. + /// The size is estimated based on encoded and compressed data. + /// Default 128MB. + Builder* max_row_group_bytes(int64_t max_row_group_bytes) { + ARROW_CHECK_GT(max_row_group_bytes, 0) << "max_row_group_bytes must be positive"; + max_row_group_bytes_ = max_row_group_bytes; + return this; + } + /// Specify the data page size. /// Default 1MB. Builder* data_pagesize(int64_t pg_size) { @@ -779,11 +793,12 @@ class PARQUET_EXPORT WriterProperties { return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, - pagesize_, max_rows_per_page_, version_, created_by_, page_checksum_enabled_, - size_statistics_level_, std::move(file_encryption_properties_), - default_column_properties_, column_properties, data_page_version_, - store_decimal_as_integer_, std::move(sorting_columns_), - content_defined_chunking_enabled_, content_defined_chunking_options_)); + max_row_group_bytes_, pagesize_, max_rows_per_page_, version_, created_by_, + page_checksum_enabled_, size_statistics_level_, + std::move(file_encryption_properties_), default_column_properties_, + column_properties, data_page_version_, store_decimal_as_integer_, + std::move(sorting_columns_), content_defined_chunking_enabled_, + content_defined_chunking_options_)); } private: @@ -793,6 +808,7 @@ class PARQUET_EXPORT WriterProperties { int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t max_row_group_length_; + int64_t max_row_group_bytes_; int64_t pagesize_; int64_t max_rows_per_page_; ParquetVersion::type version_; @@ -828,6 +844,8 @@ class PARQUET_EXPORT WriterProperties { inline int64_t max_row_group_length() const { return max_row_group_length_; } + inline int64_t max_row_group_bytes() const { return max_row_group_bytes_; } + inline int64_t data_pagesize() const { return pagesize_; } inline int64_t max_rows_per_page() const { return max_rows_per_page_; } @@ -946,9 +964,10 @@ class PARQUET_EXPORT WriterProperties { private: explicit WriterProperties( MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, - int64_t max_row_group_length, int64_t pagesize, int64_t max_rows_per_page, - ParquetVersion::type version, const std::string& created_by, - bool page_write_checksum_enabled, SizeStatisticsLevel size_statistics_level, + int64_t max_row_group_length, int64_t max_row_group_bytes, int64_t pagesize, + int64_t max_rows_per_page, ParquetVersion::type version, + const std::string& created_by, bool page_write_checksum_enabled, + SizeStatisticsLevel size_statistics_level, std::shared_ptr file_encryption_properties, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties, @@ -959,6 +978,7 @@ class PARQUET_EXPORT WriterProperties { dictionary_pagesize_limit_(dictionary_pagesize_limit), write_batch_size_(write_batch_size), max_row_group_length_(max_row_group_length), + max_row_group_bytes_(max_row_group_bytes), pagesize_(pagesize), max_rows_per_page_(max_rows_per_page), parquet_data_page_version_(data_page_version), @@ -978,6 +998,7 @@ class PARQUET_EXPORT WriterProperties { int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t max_row_group_length_; + int64_t max_row_group_bytes_; int64_t pagesize_; int64_t max_rows_per_page_; ParquetDataPageVersion parquet_data_page_version_;