From 9ba01dcd67b35a54605a719e9a1ad05322eacf98 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 13 Dec 2025 10:18:13 -0700 Subject: [PATCH 1/4] make shuffle writer buffer size configurable --- .../scala/org/apache/comet/CometConf.scala | 11 ++++++ native/core/src/execution/planner.rs | 2 + .../src/execution/shuffle/shuffle_writer.rs | 39 +++++++++++++++---- native/proto/src/proto/operator.proto | 3 ++ .../shuffle/CometNativeShuffleWriter.scala | 1 + 5 files changed, 49 insertions(+), 7 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 7eaae60552..ad79183214 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -441,6 +441,17 @@ object CometConf extends ShimCometConf { .intConf .createWithDefault(8192) + val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Int] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.writeBufferSize") + .category(CATEGORY_SHUFFLE) + .doc("Size of the write buffer in bytes used by the native shuffle writer when writing " + + "shuffle data to disk. Larger values may improve write performance by reducing " + + "the number of system calls, but will use more memory. " + + "The default is 1MB which provides a good balance between performance and memory usage.") + .intConf + .checkValue(v => v > 0, "Write buffer size must be positive") + .createWithDefault(1024 * 1024) + val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf( "spark.comet.shuffle.preferDictionary.ratio") .category(CATEGORY_SHUFFLE) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 712bedc2df..67b2523be3 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1244,6 +1244,7 @@ impl PhysicalPlanner { ))), }?; + let write_buffer_size = writer.write_buffer_size as usize; let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( Arc::clone(&child.native_plan), partitioning, @@ -1251,6 +1252,7 @@ impl PhysicalPlanner { writer.output_data_file.clone(), writer.output_index_file.clone(), writer.tracing_enabled, + write_buffer_size, )?); Ok(( diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 4d395880c2..44a3cd67aa 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -78,10 +78,13 @@ pub struct ShuffleWriterExec { /// The compression codec to use when compressing shuffle blocks codec: CompressionCodec, tracing_enabled: bool, + /// Size of the write buffer in bytes + write_buffer_size: usize, } impl ShuffleWriterExec { /// Create a new ShuffleWriterExec + #[allow(clippy::too_many_arguments)] pub fn try_new( input: Arc, partitioning: CometPartitioning, @@ -89,6 +92,7 @@ impl ShuffleWriterExec { output_data_file: String, output_index_file: String, tracing_enabled: bool, + write_buffer_size: usize, ) -> Result { let cache = PlanProperties::new( EquivalenceProperties::new(Arc::clone(&input.schema())), @@ -106,6 +110,7 @@ impl ShuffleWriterExec { cache, codec, tracing_enabled, + write_buffer_size, }) } } @@ -169,6 +174,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.output_data_file.clone(), self.output_index_file.clone(), self.tracing_enabled, + self.write_buffer_size, )?)), _ => panic!("ShuffleWriterExec wrong number of children"), } @@ -195,6 +201,7 @@ impl ExecutionPlan for ShuffleWriterExec { context, self.codec.clone(), self.tracing_enabled, + self.write_buffer_size, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -214,6 +221,7 @@ async fn external_shuffle( context: Arc, codec: CompressionCodec, tracing_enabled: bool, + write_buffer_size: usize, ) -> Result { with_trace_async("external_shuffle", tracing_enabled, || async { let schema = input.schema(); @@ -227,6 +235,7 @@ async fn external_shuffle( metrics, context.session_config().batch_size(), codec, + write_buffer_size, )?) } _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( @@ -240,6 +249,7 @@ async fn external_shuffle( context.session_config().batch_size(), codec, tracing_enabled, + write_buffer_size, )?), }; @@ -331,6 +341,8 @@ struct MultiPartitionShuffleRepartitioner { /// Reservation for repartitioning reservation: MemoryReservation, tracing_enabled: bool, + /// Size of the write buffer in bytes + write_buffer_size: usize, } #[derive(Default)] @@ -362,6 +374,7 @@ impl MultiPartitionShuffleRepartitioner { batch_size: usize, codec: CompressionCodec, tracing_enabled: bool, + write_buffer_size: usize, ) -> Result { let num_output_partitions = partitioning.partition_count(); assert_ne!( @@ -407,6 +420,7 @@ impl MultiPartitionShuffleRepartitioner { batch_size, reservation, tracing_enabled, + write_buffer_size, }) } @@ -654,8 +668,10 @@ impl MultiPartitionShuffleRepartitioner { output_data: &mut BufWriter, encode_time: &Time, write_time: &Time, + write_buffer_size: usize, ) -> Result<()> { - let mut buf_batch_writer = BufBatchWriter::new(shuffle_block_writer, output_data); + let mut buf_batch_writer = + BufBatchWriter::new(shuffle_block_writer, output_data, write_buffer_size); for batch in partition_iter { let batch = batch?; buf_batch_writer.write(&batch, encode_time, write_time)?; @@ -714,7 +730,12 @@ impl MultiPartitionShuffleRepartitioner { for partition_id in 0..num_output_partitions { let partition_writer = &mut self.partition_writers[partition_id]; let mut iter = partitioned_batches.produce(partition_id); - spilled_bytes += partition_writer.spill(&mut iter, &self.runtime, &self.metrics)?; + spilled_bytes += partition_writer.spill( + &mut iter, + &self.runtime, + &self.metrics, + self.write_buffer_size, + )?; } let mut timer = self.metrics.mempool_time.timer(); @@ -795,6 +816,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { &mut output_data, &self.metrics.encode_time, &self.metrics.write_time, + self.write_buffer_size, )?; } @@ -862,6 +884,7 @@ impl SinglePartitionShufflePartitioner { metrics: ShuffleRepartitionerMetrics, batch_size: usize, codec: CompressionCodec, + write_buffer_size: usize, ) -> Result { let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; @@ -872,7 +895,8 @@ impl SinglePartitionShufflePartitioner { .open(output_data_path) .map_err(to_df_err)?; - let output_data_writer = BufBatchWriter::new(shuffle_block_writer, output_data_file); + let output_data_writer = + BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size); Ok(Self { output_data_writer, @@ -1131,6 +1155,7 @@ impl PartitionWriter { iter: &mut PartitionedBatchIterator, runtime: &RuntimeEnv, metrics: &ShuffleRepartitionerMetrics, + write_buffer_size: usize, ) -> Result { if let Some(batch) = iter.next() { self.ensure_spill_file_created(runtime)?; @@ -1139,6 +1164,7 @@ impl PartitionWriter { let mut buf_batch_writer = BufBatchWriter::new( &mut self.shuffle_block_writer, &mut self.spill_file.as_mut().unwrap().file, + write_buffer_size, ); let mut bytes_written = buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; @@ -1194,10 +1220,7 @@ struct BufBatchWriter, W: Write> { } impl, W: Write> BufBatchWriter { - fn new(shuffle_block_writer: S, writer: W) -> Self { - // 1MB should be good enough to avoid frequent system calls, - // and also won't cause too much memory usage - let buffer_max_size = 1024 * 1024; + fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self { Self { shuffle_block_writer, writer, @@ -1343,6 +1366,7 @@ mod test { 1024, CompressionCodec::Lz4Frame, false, + 1024 * 1024, // write_buffer_size: 1MB default ) .unwrap(); @@ -1439,6 +1463,7 @@ mod test { "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), false, + 1024 * 1024, // write_buffer_size: 1MB default ) .unwrap(); diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 9d4435751d..f30c503f48 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -236,6 +236,9 @@ message ShuffleWriter { CompressionCodec codec = 5; int32 compression_level = 6; bool tracing_enabled = 7; + // Size of the write buffer in bytes used when writing shuffle data to disk. + // Larger values may improve write performance but use more memory. + int32 write_buffer_size = 8; } message ParquetWriter { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 43a1e5b9a0..f64c8b8a8c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -183,6 +183,7 @@ class CometNativeShuffleWriter[K, V]( } shuffleWriterBuilder.setCompressionLevel( CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL.get) + shuffleWriterBuilder.setWriteBufferSize(CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get()) outputPartitioning match { case p if isSinglePartitioning(p) => From fd2b1198bb0ef2f193f8982383f6938d22ecd1da Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 13 Dec 2025 13:44:49 -0700 Subject: [PATCH 2/4] docs --- docs/source/user-guide/latest/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 5b416f927d..70bdc0306c 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -107,6 +107,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.exec.shuffle.compression.codec` | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 | | `spark.comet.exec.shuffle.compression.zstd.level` | The compression level to use when compressing shuffle files with zstd. | 1 | | `spark.comet.exec.shuffle.enabled` | Whether to enable Comet native shuffle. Note that this requires setting `spark.shuffle.manager` to `org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager`. `spark.shuffle.manager` must be set before starting the Spark application and cannot be changed during the application. | true | +| `spark.comet.exec.shuffle.writeBufferSize` | Size of the write buffer in bytes used by the native shuffle writer when writing shuffle data to disk. Larger values may improve write performance by reducing the number of system calls, but will use more memory. The default is 1MB which provides a good balance between performance and memory usage. | 1048576 | | `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true | | `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true | | `spark.comet.shuffle.preferDictionary.ratio` | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 | From 480411bd70f5220ab64a0c250e7b40d678993241 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 13 Dec 2025 14:42:08 -0700 Subject: [PATCH 3/4] fix --- native/core/benches/shuffle_writer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 70f1acc8db..0857ef78c6 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -152,6 +152,7 @@ fn create_shuffle_writer_exec( "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), false, + 1024 * 1024, ) .unwrap() } From 817e9f0426a1b9e014d09dfae3757ee3712bc1a6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 15 Dec 2025 14:49:17 -0700 Subject: [PATCH 4/4] address feedback --- common/src/main/scala/org/apache/comet/CometConf.scala | 6 +++--- docs/source/user-guide/latest/configs.md | 2 +- .../comet/execution/shuffle/CometNativeShuffleWriter.scala | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index ad79183214..cccad53c53 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -441,16 +441,16 @@ object CometConf extends ShimCometConf { .intConf .createWithDefault(8192) - val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Int] = + val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.writeBufferSize") .category(CATEGORY_SHUFFLE) .doc("Size of the write buffer in bytes used by the native shuffle writer when writing " + "shuffle data to disk. Larger values may improve write performance by reducing " + "the number of system calls, but will use more memory. " + "The default is 1MB which provides a good balance between performance and memory usage.") - .intConf + .bytesConf(ByteUnit.MiB) .checkValue(v => v > 0, "Write buffer size must be positive") - .createWithDefault(1024 * 1024) + .createWithDefault(1) val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf( "spark.comet.shuffle.preferDictionary.ratio") diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 70bdc0306c..db7d2ce32b 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -107,7 +107,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.exec.shuffle.compression.codec` | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 | | `spark.comet.exec.shuffle.compression.zstd.level` | The compression level to use when compressing shuffle files with zstd. | 1 | | `spark.comet.exec.shuffle.enabled` | Whether to enable Comet native shuffle. Note that this requires setting `spark.shuffle.manager` to `org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager`. `spark.shuffle.manager` must be set before starting the Spark application and cannot be changed during the application. | true | -| `spark.comet.exec.shuffle.writeBufferSize` | Size of the write buffer in bytes used by the native shuffle writer when writing shuffle data to disk. Larger values may improve write performance by reducing the number of system calls, but will use more memory. The default is 1MB which provides a good balance between performance and memory usage. | 1048576 | +| `spark.comet.exec.shuffle.writeBufferSize` | Size of the write buffer in bytes used by the native shuffle writer when writing shuffle data to disk. Larger values may improve write performance by reducing the number of system calls, but will use more memory. The default is 1MB which provides a good balance between performance and memory usage. | 1048576b | | `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true | | `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true | | `spark.comet.shuffle.preferDictionary.ratio` | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 | diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index f64c8b8a8c..cb2041983e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -183,7 +183,8 @@ class CometNativeShuffleWriter[K, V]( } shuffleWriterBuilder.setCompressionLevel( CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL.get) - shuffleWriterBuilder.setWriteBufferSize(CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get()) + shuffleWriterBuilder.setWriteBufferSize( + CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get().max(Int.MaxValue).toInt) outputPartitioning match { case p if isSinglePartitioning(p) =>