diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 7eaae60552..cccad53c53 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -441,6 +441,17 @@ object CometConf extends ShimCometConf { .intConf .createWithDefault(8192) + val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.writeBufferSize") + .category(CATEGORY_SHUFFLE) + .doc("Size of the write buffer in bytes used by the native shuffle writer when writing " + + "shuffle data to disk. Larger values may improve write performance by reducing " + + "the number of system calls, but will use more memory. " + + "The default is 1MB which provides a good balance between performance and memory usage.") + .bytesConf(ByteUnit.MiB) + .checkValue(v => v > 0, "Write buffer size must be positive") + .createWithDefault(1) + val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf( "spark.comet.shuffle.preferDictionary.ratio") .category(CATEGORY_SHUFFLE) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 5b416f927d..db7d2ce32b 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -107,6 +107,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.exec.shuffle.compression.codec` | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 | | `spark.comet.exec.shuffle.compression.zstd.level` | The compression level to use when compressing shuffle files with zstd. | 1 | | `spark.comet.exec.shuffle.enabled` | Whether to enable Comet native shuffle. Note that this requires setting `spark.shuffle.manager` to `org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager`. `spark.shuffle.manager` must be set before starting the Spark application and cannot be changed during the application. | true | +| `spark.comet.exec.shuffle.writeBufferSize` | Size of the write buffer in bytes used by the native shuffle writer when writing shuffle data to disk. Larger values may improve write performance by reducing the number of system calls, but will use more memory. The default is 1MB which provides a good balance between performance and memory usage. | 1048576b | | `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true | | `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true | | `spark.comet.shuffle.preferDictionary.ratio` | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 | diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 70f1acc8db..0857ef78c6 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -152,6 +152,7 @@ fn create_shuffle_writer_exec( "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), false, + 1024 * 1024, ) .unwrap() } diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 712bedc2df..67b2523be3 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1244,6 +1244,7 @@ impl PhysicalPlanner { ))), }?; + let write_buffer_size = writer.write_buffer_size as usize; let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( Arc::clone(&child.native_plan), partitioning, @@ -1251,6 +1252,7 @@ impl PhysicalPlanner { writer.output_data_file.clone(), writer.output_index_file.clone(), writer.tracing_enabled, + write_buffer_size, )?); Ok(( diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 4d395880c2..44a3cd67aa 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -78,10 +78,13 @@ pub struct ShuffleWriterExec { /// The compression codec to use when compressing shuffle blocks codec: CompressionCodec, tracing_enabled: bool, + /// Size of the write buffer in bytes + write_buffer_size: usize, } impl ShuffleWriterExec { /// Create a new ShuffleWriterExec + #[allow(clippy::too_many_arguments)] pub fn try_new( input: Arc, partitioning: CometPartitioning, @@ -89,6 +92,7 @@ impl ShuffleWriterExec { output_data_file: String, output_index_file: String, tracing_enabled: bool, + write_buffer_size: usize, ) -> Result { let cache = PlanProperties::new( EquivalenceProperties::new(Arc::clone(&input.schema())), @@ -106,6 +110,7 @@ impl ShuffleWriterExec { cache, codec, tracing_enabled, + write_buffer_size, }) } } @@ -169,6 +174,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.output_data_file.clone(), self.output_index_file.clone(), self.tracing_enabled, + self.write_buffer_size, )?)), _ => panic!("ShuffleWriterExec wrong number of children"), } @@ -195,6 +201,7 @@ impl ExecutionPlan for ShuffleWriterExec { context, self.codec.clone(), self.tracing_enabled, + self.write_buffer_size, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -214,6 +221,7 @@ async fn external_shuffle( context: Arc, codec: CompressionCodec, tracing_enabled: bool, + write_buffer_size: usize, ) -> Result { with_trace_async("external_shuffle", tracing_enabled, || async { let schema = input.schema(); @@ -227,6 +235,7 @@ async fn external_shuffle( metrics, context.session_config().batch_size(), codec, + write_buffer_size, )?) } _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( @@ -240,6 +249,7 @@ async fn external_shuffle( context.session_config().batch_size(), codec, tracing_enabled, + write_buffer_size, )?), }; @@ -331,6 +341,8 @@ struct MultiPartitionShuffleRepartitioner { /// Reservation for repartitioning reservation: MemoryReservation, tracing_enabled: bool, + /// Size of the write buffer in bytes + write_buffer_size: usize, } #[derive(Default)] @@ -362,6 +374,7 @@ impl MultiPartitionShuffleRepartitioner { batch_size: usize, codec: CompressionCodec, tracing_enabled: bool, + write_buffer_size: usize, ) -> Result { let num_output_partitions = partitioning.partition_count(); assert_ne!( @@ -407,6 +420,7 @@ impl MultiPartitionShuffleRepartitioner { batch_size, reservation, tracing_enabled, + write_buffer_size, }) } @@ -654,8 +668,10 @@ impl MultiPartitionShuffleRepartitioner { output_data: &mut BufWriter, encode_time: &Time, write_time: &Time, + write_buffer_size: usize, ) -> Result<()> { - let mut buf_batch_writer = BufBatchWriter::new(shuffle_block_writer, output_data); + let mut buf_batch_writer = + BufBatchWriter::new(shuffle_block_writer, output_data, write_buffer_size); for batch in partition_iter { let batch = batch?; buf_batch_writer.write(&batch, encode_time, write_time)?; @@ -714,7 +730,12 @@ impl MultiPartitionShuffleRepartitioner { for partition_id in 0..num_output_partitions { let partition_writer = &mut self.partition_writers[partition_id]; let mut iter = partitioned_batches.produce(partition_id); - spilled_bytes += partition_writer.spill(&mut iter, &self.runtime, &self.metrics)?; + spilled_bytes += partition_writer.spill( + &mut iter, + &self.runtime, + &self.metrics, + self.write_buffer_size, + )?; } let mut timer = self.metrics.mempool_time.timer(); @@ -795,6 +816,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { &mut output_data, &self.metrics.encode_time, &self.metrics.write_time, + self.write_buffer_size, )?; } @@ -862,6 +884,7 @@ impl SinglePartitionShufflePartitioner { metrics: ShuffleRepartitionerMetrics, batch_size: usize, codec: CompressionCodec, + write_buffer_size: usize, ) -> Result { let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; @@ -872,7 +895,8 @@ impl SinglePartitionShufflePartitioner { .open(output_data_path) .map_err(to_df_err)?; - let output_data_writer = BufBatchWriter::new(shuffle_block_writer, output_data_file); + let output_data_writer = + BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size); Ok(Self { output_data_writer, @@ -1131,6 +1155,7 @@ impl PartitionWriter { iter: &mut PartitionedBatchIterator, runtime: &RuntimeEnv, metrics: &ShuffleRepartitionerMetrics, + write_buffer_size: usize, ) -> Result { if let Some(batch) = iter.next() { self.ensure_spill_file_created(runtime)?; @@ -1139,6 +1164,7 @@ impl PartitionWriter { let mut buf_batch_writer = BufBatchWriter::new( &mut self.shuffle_block_writer, &mut self.spill_file.as_mut().unwrap().file, + write_buffer_size, ); let mut bytes_written = buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; @@ -1194,10 +1220,7 @@ struct BufBatchWriter, W: Write> { } impl, W: Write> BufBatchWriter { - fn new(shuffle_block_writer: S, writer: W) -> Self { - // 1MB should be good enough to avoid frequent system calls, - // and also won't cause too much memory usage - let buffer_max_size = 1024 * 1024; + fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self { Self { shuffle_block_writer, writer, @@ -1343,6 +1366,7 @@ mod test { 1024, CompressionCodec::Lz4Frame, false, + 1024 * 1024, // write_buffer_size: 1MB default ) .unwrap(); @@ -1439,6 +1463,7 @@ mod test { "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), false, + 1024 * 1024, // write_buffer_size: 1MB default ) .unwrap(); diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 9d4435751d..f30c503f48 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -236,6 +236,9 @@ message ShuffleWriter { CompressionCodec codec = 5; int32 compression_level = 6; bool tracing_enabled = 7; + // Size of the write buffer in bytes used when writing shuffle data to disk. + // Larger values may improve write performance but use more memory. + int32 write_buffer_size = 8; } message ParquetWriter { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 43a1e5b9a0..cb2041983e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -183,6 +183,8 @@ class CometNativeShuffleWriter[K, V]( } shuffleWriterBuilder.setCompressionLevel( CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL.get) + shuffleWriterBuilder.setWriteBufferSize( + CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get().max(Int.MaxValue).toInt) outputPartitioning match { case p if isSinglePartitioning(p) =>