diff --git a/.github/actions/rust-test/action.yaml b/.github/actions/rust-test/action.yaml index 4c9b13a174..10fc1375f0 100644 --- a/.github/actions/rust-test/action.yaml +++ b/.github/actions/rust-test/action.yaml @@ -70,5 +70,7 @@ runs: shell: bash run: | cd native + # Set LD_LIBRARY_PATH to include JVM library path for tests that use JNI + export LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH} RUST_BACKTRACE=1 cargo nextest run diff --git a/native/Cargo.lock b/native/Cargo.lock index bf9a7ea2da..be936cb4e4 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -598,9 +598,9 @@ dependencies = [ [[package]] name = "aws-lc-rs" -version = "1.15.1" +version = "1.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b5ce75405893cd713f9ab8e297d8e438f624dde7d706108285f7e17a25a180f" +checksum = "6a88aab2464f1f25453baa7a07c84c5b7684e274054ba06817f382357f77a288" dependencies = [ "aws-lc-sys", "zeroize", @@ -608,9 +608,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.34.0" +version = "0.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "179c3777a8b5e70e90ea426114ffc565b2c1a9f82f6c4a0c5a34aa6ef5e781b6" +checksum = "b45afffdee1e7c9126814751f88dddc747f41d91da16c9551a0f1e8a11e788a1" dependencies = [ "cc", "cmake", @@ -789,9 +789,9 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.61.8" +version = "0.61.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6864c190cbb8e30cf4b77b2c8f3b6dfffa697a09b7218d2f7cd3d4c4065a9f7" +checksum = "49fa1213db31ac95288d981476f78d05d9cbb0353d22cdf3472cc05bb02f6551" dependencies = [ "aws-smithy-types", ] @@ -817,9 +817,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.9.5" +version = "1.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a392db6c583ea4a912538afb86b7be7c5d8887d91604f50eb55c262ee1b4a5f5" +checksum = "65fda37911905ea4d3141a01364bc5509a0f32ae3f3b22d6e330c0abfb62d247" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1145,9 +1145,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.19.0" +version = "3.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" [[package]] name = "bytecheck" @@ -1361,9 +1361,9 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cmake" -version = "0.1.54" +version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" +checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" dependencies = [ "cc", ] @@ -1852,7 +1852,7 @@ dependencies = [ "async-trait", "bytes", "chrono", - "fs-hdfs", + "datafusion-comet-fs-hdfs3", "fs-hdfs3", "futures", "object_store", @@ -2728,20 +2728,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fs-hdfs" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25f164ff6334da016dffd1c29a3c05b81c35b857ef829d3fa9e58ae8d3e6f76b" -dependencies = [ - "bindgen 0.64.0", - "cc", - "lazy_static", - "libc", - "log", - "url", -] - [[package]] name = "fs-hdfs3" version = "0.1.12" @@ -5005,9 +4991,9 @@ dependencies = [ [[package]] name = "roaring" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f08d6a905edb32d74a5d5737a0c9d7e950c312f3c46cb0ca0a2ca09ea11878a0" +checksum = "8ba9ce64a8f45d7fc86358410bb1a82e8c987504c0d4900e9141d69a9f26c885" dependencies = [ "bytemuck", "byteorder", @@ -5159,9 +5145,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c" +checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282" dependencies = [ "web-time", "zeroize", @@ -5878,18 +5864,18 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.7.3" +version = "0.7.5+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" dependencies = [ "serde_core", ] [[package]] name = "toml_edit" -version = "0.23.9" +version = "0.23.10+spec-1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d7cbc3b4b49633d57a0509303158ca50de80ae32c265093b24c414705807832" +checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269" dependencies = [ "indexmap 2.12.1", "toml_datetime", @@ -5899,9 +5885,9 @@ dependencies = [ [[package]] name = "toml_parser" -version = "1.0.4" +version = "1.0.6+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" dependencies = [ "winnow", ] @@ -5953,9 +5939,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -5975,9 +5961,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.35" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", ] diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index c79d603095..1a409464d3 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -99,7 +99,7 @@ datafusion-functions-nested = { version = "51.0.0" } [features] backtrace = ["datafusion/backtrace"] -default = [] +default = ["hdfs-opendal"] hdfs = ["datafusion-comet-objectstore-hdfs"] hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"] jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 57246abf7f..2ca1e9cfd5 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -22,9 +22,13 @@ use std::{ fmt, fmt::{Debug, Formatter}, fs::File, + io::Cursor, sync::Arc, }; +use opendal::{services::Hdfs, Operator}; +use url::Url; + use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -49,6 +53,134 @@ use parquet::{ use crate::execution::shuffle::CompressionCodec; +/// Enum representing different types of Arrow writers based on storage backend +enum ParquetWriter { + /// Writer for local file system + LocalFile(ArrowWriter), + /// Writer for HDFS or other remote storage (writes to in-memory buffer) + /// Contains the arrow writer, HDFS operator, and destination path + /// an Arrow writer writes to in-memory buffer the data converted to Parquet format + /// The opendal::Writer is created lazily on first write + Remote( + ArrowWriter>>, + Option, + Operator, + String, + ), +} + +impl ParquetWriter { + /// Write a RecordBatch to the underlying writer + async fn write( + &mut self, + batch: &RecordBatch, + ) -> std::result::Result<(), parquet::errors::ParquetError> { + match self { + ParquetWriter::LocalFile(writer) => writer.write(batch), + ParquetWriter::Remote( + arrow_parquet_buffer_writer, + hdfs_writer_opt, + op, + output_path, + ) => { + // Write batch to in-memory buffer + arrow_parquet_buffer_writer.write(batch)?; + + // Flush and get the current buffer content + arrow_parquet_buffer_writer.flush()?; + let cursor = arrow_parquet_buffer_writer.inner_mut(); + let current_data = cursor.get_ref().clone(); + + // Create HDFS writer lazily on first write + if hdfs_writer_opt.is_none() { + let writer = op.writer(output_path.as_str()).await.map_err(|e| { + parquet::errors::ParquetError::External( + format!("Failed to create HDFS writer for '{}': {}", output_path, e) + .into(), + ) + })?; + *hdfs_writer_opt = Some(writer); + } + + // Write the accumulated data to HDFS + if let Some(hdfs_writer) = hdfs_writer_opt { + hdfs_writer.write(current_data).await.map_err(|e| { + parquet::errors::ParquetError::External( + format!( + "Failed to write batch to HDFS file '{}': {}", + output_path, e + ) + .into(), + ) + })?; + } + + // Clear the buffer after upload + cursor.get_mut().clear(); + cursor.set_position(0); + + Ok(()) + } + } + } + + /// Close the writer and finalize the file + async fn close(self) -> std::result::Result<(), parquet::errors::ParquetError> { + match self { + ParquetWriter::LocalFile(writer) => { + writer.close()?; + Ok(()) + } + ParquetWriter::Remote( + arrow_parquet_buffer_writer, + mut hdfs_writer_opt, + op, + output_path, + ) => { + // Close the arrow writer to finalize parquet format + let cursor = arrow_parquet_buffer_writer.into_inner()?; + let final_data = cursor.into_inner(); + + // Create HDFS writer if not already created + if hdfs_writer_opt.is_none() && !final_data.is_empty() { + let writer = op.writer(output_path.as_str()).await.map_err(|e| { + parquet::errors::ParquetError::External( + format!("Failed to create HDFS writer for '{}': {}", output_path, e) + .into(), + ) + })?; + hdfs_writer_opt = Some(writer); + } + + // Write any remaining data + if !final_data.is_empty() { + if let Some(mut hdfs_writer) = hdfs_writer_opt { + hdfs_writer.write(final_data).await.map_err(|e| { + parquet::errors::ParquetError::External( + format!( + "Failed to write final data to HDFS file '{}': {}", + output_path, e + ) + .into(), + ) + })?; + + // Close the HDFS writer + hdfs_writer.close().await.map_err(|e| { + parquet::errors::ParquetError::External( + format!("Failed to close HDFS writer for '{}': {}", output_path, e) + .into(), + ) + })?; + } + } + + Ok(()) + } + } + } +} + /// Parquet writer operator that writes input batches to a Parquet file #[derive(Debug)] pub struct ParquetWriterExec { @@ -119,6 +251,129 @@ impl ParquetWriterExec { CompressionCodec::Snappy => Ok(Compression::SNAPPY), } } + + /// Create an Arrow writer based on the storage scheme + /// + /// # Arguments + /// * `storage_scheme` - The storage backend ("hdfs", "s3", or "local") + /// * `output_file_path` - The full path to the output file + /// * `schema` - The Arrow schema for the Parquet file + /// * `props` - Writer properties including compression + /// + /// # Returns + /// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme + /// * `Err(DataFusionError)` - If writer creation fails + fn create_arrow_writer( + output_file_path: &str, + schema: SchemaRef, + props: WriterProperties, + ) -> Result { + // Determine storage scheme from output_file_path + let storage_scheme = if output_file_path.starts_with("hdfs://") { + "hdfs" + } else if output_file_path.starts_with("s3://") || output_file_path.starts_with("s3a://") { + "s3" + } else { + "local" + }; + + match storage_scheme { + "hdfs" => { + // Parse the output_file_path to extract namenode and path + // Expected format: hdfs://namenode:port/path/to/file + let url = Url::parse(output_file_path).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to parse HDFS URL '{}': {}", + output_file_path, e + )) + })?; + + // Extract namenode (scheme + host + port) + let namenode = format!( + "{}://{}{}", + url.scheme(), + url.host_str().unwrap_or("localhost"), + url.port() + .map(|p| format!(":{}", p)) + .unwrap_or_else(|| ":9000".to_string()) + ); + + // Extract the path (without the scheme and host) + let hdfs_path = url.path().to_string(); + + // For remote storage (HDFS, S3), write to an in-memory buffer + let buffer = Vec::new(); + let cursor = Cursor::new(buffer); + let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props)) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create {} writer: {}", + storage_scheme, e + )) + })?; + + let builder = Hdfs::default().name_node(&namenode); + let op = Operator::new(builder) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create HDFS operator for '{}' (namenode: {}): {}", + output_file_path, namenode, e + )) + })? + .finish(); + + // HDFS writer will be created lazily on first write + // Use only the path part for the HDFS writer + Ok(ParquetWriter::Remote( + arrow_parquet_buffer_writer, + None, + op, + hdfs_path, + )) + } + "local" => { + // For a local file system, write directly to file + // Strip file:// or file: prefix if present + let local_path = output_file_path + .strip_prefix("file://") + .or_else(|| output_file_path.strip_prefix("file:")) + .unwrap_or(output_file_path); + + // Extract the parent directory from the file path + let output_dir = std::path::Path::new(local_path).parent().ok_or_else(|| { + DataFusionError::Execution(format!( + "Failed to extract parent directory from path '{}'", + local_path + )) + })?; + + // Create the parent directory if it doesn't exist + std::fs::create_dir_all(output_dir).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create output directory '{}': {}", + output_dir.display(), + e + )) + })?; + + let file = File::create(local_path).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create output file '{}': {}", + local_path, e + )) + })?; + + let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| { + DataFusionError::Execution(format!("Failed to create local file writer: {}", e)) + })?; + Ok(ParquetWriter::LocalFile(writer)) + } + _ => Err(DataFusionError::Execution(format!( + "Unsupported storage scheme: {}", + storage_scheme + ))), + } + } } impl DisplayAs for ParquetWriterExec { @@ -217,47 +472,23 @@ impl ExecutionPlan for ParquetWriterExec { .collect(); let output_schema = Arc::new(arrow::datatypes::Schema::new(fields)); - // Strip file:// or file: prefix if present - let local_path = work_dir - .strip_prefix("file://") - .or_else(|| work_dir.strip_prefix("file:")) - .unwrap_or(&work_dir) - .to_string(); - - // Create output directory - std::fs::create_dir_all(&local_path).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output directory '{}': {}", - local_path, e - )) - })?; - // Generate part file name for this partition // If using FileCommitProtocol (work_dir is set), include task_attempt_id in the filename let part_file = if let Some(attempt_id) = task_attempt_id { format!( "{}/part-{:05}-{:05}.parquet", - local_path, self.partition_id, attempt_id + work_dir, self.partition_id, attempt_id ) } else { - format!("{}/part-{:05}.parquet", local_path, self.partition_id) + format!("{}/part-{:05}.parquet", work_dir, self.partition_id) }; - // Create the Parquet file - let file = File::create(&part_file).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output file '{}': {}", - part_file, e - )) - })?; - // Configure writer properties let props = WriterProperties::builder() .set_compression(compression) .build(); - let mut writer = ArrowWriter::try_new(file, Arc::clone(&output_schema), Some(props)) - .map_err(|e| DataFusionError::Execution(format!("Failed to create writer: {}", e)))?; + let mut writer = Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?; // Clone schema for use in async closure let schema_for_write = Arc::clone(&output_schema); @@ -286,12 +517,12 @@ impl ExecutionPlan for ParquetWriterExec { batch }; - writer.write(&renamed_batch).map_err(|e| { + writer.write(&renamed_batch).await.map_err(|e| { DataFusionError::Execution(format!("Failed to write batch: {}", e)) })?; } - writer.close().map_err(|e| { + writer.close().await.map_err(|e| { DataFusionError::Execution(format!("Failed to close writer: {}", e)) })?; @@ -322,3 +553,274 @@ impl ExecutionPlan for ParquetWriterExec { ))) } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int32Array, StringArray}; + use arrow::datatypes::{DataType, Field, Schema}; + use std::sync::Arc; + + /// Helper function to create a test RecordBatch with 1000 rows of (int, string) data + /// Example batch_id 1 -> 0..1000, 2 -> 1001..2000 + fn create_test_record_batch(batch_id: i32) -> Result { + assert!(batch_id > 0, "batch_id must be greater than 0"); + let num_rows = batch_id * 1000; + + let int_array = Int32Array::from_iter_values(((batch_id - 1) * 1000)..num_rows); + + let string_values: Vec = (((batch_id - 1) * 1000)..num_rows) + .map(|i| format!("value_{}", i)) + .collect(); + let string_array = StringArray::from(string_values); + + // Define schema + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + // Create RecordBatch + RecordBatch::try_new(schema, vec![Arc::new(int_array), Arc::new(string_array)]) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + } + + #[tokio::test] + #[cfg(feature = "hdfs-opendal")] + #[ignore = "This test requires a running HDFS cluster"] + async fn test_write_to_hdfs_sync() -> Result<()> { + use opendal::services::Hdfs; + use opendal::Operator; + + // Configure HDFS connection + let namenode = "hdfs://namenode:9000"; + let output_path = "/user/test_write/data.parquet"; + + // Create OpenDAL HDFS operator + let builder = Hdfs::default().name_node(namenode); + let op = Operator::new(builder) + .map_err(|e| { + DataFusionError::Execution(format!("Failed to create HDFS operator: {}", e)) + })? + .finish(); + + let mut hdfs_writer = op.writer(output_path).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e)) + })?; + + let mut buffer = Cursor::new(Vec::new()); + let mut writer = + ArrowWriter::try_new(&mut buffer, create_test_record_batch(1)?.schema(), None)?; + + for i in 1..=5 { + let record_batch = create_test_record_batch(i)?; + + writer.write(&record_batch)?; + + println!( + "Successfully wrote 1000 rows to HDFS at {}{}", + namenode, output_path + ); + } + + writer.close()?; + + hdfs_writer.write(buffer.into_inner()).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write with HDFS writer: {}", e)) + })?; + + hdfs_writer.close().await.map_err(|e| { + DataFusionError::Execution(format!("Failed to close HDFS writer: {}", e)) + })?; + + Ok(()) + } + + #[tokio::test] + #[cfg(feature = "hdfs-opendal")] + #[ignore = "This test requires a running HDFS cluster"] + async fn test_write_to_hdfs_streaming() -> Result<()> { + use opendal::services::Hdfs; + use opendal::Operator; + + // Configure HDFS connection + let namenode = "hdfs://namenode:9000"; + let output_path = "/user/test_write_streaming/data.parquet"; + + // Create OpenDAL HDFS operator + let builder = Hdfs::default().name_node(namenode); + let op = Operator::new(builder) + .map_err(|e| { + DataFusionError::Execution(format!("Failed to create HDFS operator: {}", e)) + })? + .finish(); + + // Create a single HDFS writer for the entire file + let mut hdfs_writer = op.writer(output_path).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e)) + })?; + + // Create a single ArrowWriter that will be used for all batches + let buffer = Cursor::new(Vec::new()); + let mut writer = ArrowWriter::try_new(buffer, create_test_record_batch(1)?.schema(), None)?; + + // Write each batch and upload to HDFS immediately (streaming approach) + for i in 1..=5 { + let record_batch = create_test_record_batch(i)?; + + // Write the batch to the parquet writer + writer.write(&record_batch)?; + + // Flush the writer to ensure data is written to the buffer + writer.flush()?; + + // Get the current buffer content through the writer + let cursor = writer.inner_mut(); + let current_data = cursor.get_ref().clone(); + + // Write the accumulated data to HDFS + hdfs_writer.write(current_data).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write batch {} to HDFS: {}", i, e)) + })?; + + // Clear the buffer for the next iteration + cursor.get_mut().clear(); + cursor.set_position(0); + + println!( + "Successfully streamed batch {} (1000 rows) to HDFS at {}{}", + i, namenode, output_path + ); + } + + // Close the ArrowWriter to finalize the parquet file + let cursor = writer.into_inner()?; + + // Write any remaining data from closing the writer + let final_data = cursor.into_inner(); + if !final_data.is_empty() { + hdfs_writer.write(final_data).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write final data to HDFS: {}", e)) + })?; + } + + // Close the HDFS writer + hdfs_writer.close().await.map_err(|e| { + DataFusionError::Execution(format!("Failed to close HDFS writer: {}", e)) + })?; + + println!( + "Successfully completed streaming write of 5 batches (5000 total rows) to HDFS at {}{}", + namenode, output_path + ); + + Ok(()) + } + + #[tokio::test] + #[cfg(feature = "hdfs-opendal")] + #[ignore = "This test requires a running HDFS cluster"] + async fn test_parquet_writer_streaming() -> Result<()> { + // Configure output path + let output_path = "/user/test_parquet_writer_streaming/data.parquet"; + + // Configure writer properties + let props = WriterProperties::builder() + .set_compression(Compression::UNCOMPRESSED) + .build(); + + // Create ParquetWriter using the create_arrow_writer method + // Use full HDFS URL format + let full_output_path = format!("hdfs://namenode:9000{}", output_path); + let mut writer = ParquetWriterExec::create_arrow_writer( + &full_output_path, + create_test_record_batch(1)?.schema(), + props, + )?; + + // Write 5 batches in a loop + for i in 1..=5 { + let record_batch = create_test_record_batch(i)?; + + writer.write(&record_batch).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write batch {}: {}", i, e)) + })?; + + println!( + "Successfully wrote batch {} (1000 rows) using ParquetWriter", + i + ); + } + + // Close the writer + writer + .close() + .await + .map_err(|e| DataFusionError::Execution(format!("Failed to close writer: {}", e)))?; + + println!( + "Successfully completed ParquetWriter streaming write of 5 batches (5000 total rows) to HDFS at {}", + output_path + ); + + Ok(()) + } + + #[tokio::test] + #[cfg(feature = "hdfs-opendal")] + #[ignore = "This test requires a running HDFS cluster"] + async fn test_parquet_writer_exec_with_memory_input() -> Result<()> { + use datafusion::datasource::memory::MemorySourceConfig; + use datafusion::datasource::source::DataSourceExec; + use datafusion::prelude::SessionContext; + + // Create 5 batches for the DataSourceExec input + let mut batches = Vec::new(); + for i in 1..=5 { + batches.push(create_test_record_batch(i)?); + } + + // Get schema from the first batch + let schema = batches[0].schema(); + + // Create DataSourceExec with MemorySourceConfig containing the 5 batches as a single partition + let partitions = vec![batches]; + let memory_source_config = MemorySourceConfig::try_new(&partitions, schema, None)?; + let memory_exec = Arc::new(DataSourceExec::new(Arc::new(memory_source_config))); + + // Create ParquetWriterExec with DataSourceExec as input + let output_path = "unused".to_string(); + let work_dir = "hdfs://namenode:9000/user/test_parquet_writer_exec".to_string(); + let column_names = vec!["id".to_string(), "name".to_string()]; + + let parquet_writer = ParquetWriterExec::try_new( + memory_exec, + output_path, + work_dir, + None, // job_id + Some(123), // task_attempt_id + CompressionCodec::None, + 0, // partition_id + column_names, + )?; + + // Create a session context and execute the plan + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + // Execute partition 0 + let mut stream = parquet_writer.execute(0, task_ctx)?; + + // Consume the stream (this triggers the write) + while let Some(batch_result) = stream.try_next().await? { + // The stream should be empty as ParquetWriterExec returns empty batches + assert_eq!(batch_result.num_rows(), 0); + } + + println!( + "Successfully completed ParquetWriterExec test with DataSourceExec input (5 batches, 5000 total rows)" + ); + + Ok(()) + } +} diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index 10ecefad5b..2b883bd7df 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -39,10 +39,17 @@ use log4rs::{ }; use once_cell::sync::OnceCell; -#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] +#[cfg(all( + not(target_env = "msvc"), + feature = "jemalloc", + not(feature = "mimalloc") +))] use tikv_jemallocator::Jemalloc; -#[cfg(feature = "mimalloc")] +#[cfg(all( + feature = "mimalloc", + not(all(not(target_env = "msvc"), feature = "jemalloc")) +))] use mimalloc::MiMalloc; use errors::{try_unwrap_or_throw, CometError, CometResult}; @@ -55,11 +62,18 @@ pub mod execution; mod jvm_bridge; pub mod parquet; -#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] +#[cfg(all( + not(target_env = "msvc"), + feature = "jemalloc", + not(feature = "mimalloc") +))] #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; -#[cfg(feature = "mimalloc")] +#[cfg(all( + feature = "mimalloc", + not(all(not(target_env = "msvc"), feature = "jemalloc")) +))] #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 0b5c45d24d..c9a27d7dcb 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -369,9 +369,11 @@ fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap) -> } } -// Mirrors object_store::parse::parse_url for the hdfs object store -#[cfg(feature = "hdfs")] -fn parse_hdfs_url(url: &Url) -> Result<(Box, Path), object_store::Error> { +// Creates an HDFS object store from a URL using the native HDFS implementation +#[cfg(all(feature = "hdfs", not(feature = "hdfs-opendal")))] +fn create_hdfs_object_store( + url: &Url, +) -> Result<(Box, Path), object_store::Error> { match datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref()) { Some(object_store) => { @@ -385,8 +387,11 @@ fn parse_hdfs_url(url: &Url) -> Result<(Box, Path), object_stor } } +// Creates an HDFS object store from a URL using OpenDAL #[cfg(feature = "hdfs-opendal")] -fn parse_hdfs_url(url: &Url) -> Result<(Box, Path), object_store::Error> { +fn create_hdfs_object_store( + url: &Url, +) -> Result<(Box, Path), object_store::Error> { let name_node = get_name_node_uri(url)?; let builder = opendal::services::Hdfs::default().name_node(&name_node); @@ -422,8 +427,11 @@ fn get_name_node_uri(url: &Url) -> Result { } } +// Stub implementation when HDFS support is not enabled #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] -fn parse_hdfs_url(_url: &Url) -> Result<(Box, Path), object_store::Error> { +fn create_hdfs_object_store( + _url: &Url, +) -> Result<(Box, Path), object_store::Error> { Err(object_store::Error::Generic { store: "HadoopFileSystem", source: "Hdfs support is not enabled in this build".into(), @@ -454,7 +462,7 @@ pub(crate) fn prepare_object_store_with_configs( ); let (object_store, object_store_path): (Box, Path) = if is_hdfs_scheme { - parse_hdfs_url(&url) + create_hdfs_object_store(&url) } else if scheme == "s3" { objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300)) } else { @@ -469,24 +477,59 @@ pub(crate) fn prepare_object_store_with_configs( #[cfg(test)] mod tests { - use crate::execution::operators::ExecutionError; - use crate::parquet::parquet_support::prepare_object_store_with_configs; + #[cfg(any( + all(not(feature = "hdfs"), not(feature = "hdfs-opendal")), + feature = "hdfs" + ))] use datafusion::execution::object_store::ObjectStoreUrl; + #[cfg(any( + all(not(feature = "hdfs"), not(feature = "hdfs-opendal")), + feature = "hdfs" + ))] use datafusion::execution::runtime_env::RuntimeEnv; + #[cfg(any( + all(not(feature = "hdfs"), not(feature = "hdfs-opendal")), + feature = "hdfs" + ))] use object_store::path::Path; - use std::collections::HashMap; + #[cfg(any( + all(not(feature = "hdfs"), not(feature = "hdfs-opendal")), + feature = "hdfs" + ))] use std::sync::Arc; + #[cfg(any( + all(not(feature = "hdfs"), not(feature = "hdfs-opendal")), + feature = "hdfs" + ))] use url::Url; + #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] + use crate::execution::operators::ExecutionError; + #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] + use std::collections::HashMap; + /// Parses the url, registers the object store, and returns a tuple of the object store url and object store path + #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] pub(crate) fn prepare_object_store( runtime_env: Arc, url: String, ) -> Result<(ObjectStoreUrl, Path), ExecutionError> { + use crate::parquet::parquet_support::prepare_object_store_with_configs; + prepare_object_store_with_configs(runtime_env, url, &HashMap::new()) + } + + /// Parses the url, registers the object store, and returns a tuple of the object store url and object store path + #[cfg(feature = "hdfs")] + pub(crate) fn prepare_object_store( + runtime_env: Arc, + url: String, + ) -> Result<(ObjectStoreUrl, Path), crate::execution::operators::ExecutionError> { + use crate::parquet::parquet_support::prepare_object_store_with_configs; + use std::collections::HashMap; prepare_object_store_with_configs(runtime_env, url, &HashMap::new()) } - #[cfg(not(feature = "hdfs"))] + #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] #[test] fn test_prepare_object_store() { use crate::execution::operators::ExecutionError; diff --git a/native/fs-hdfs/Cargo.toml b/native/fs-hdfs/Cargo.toml index 98005c860f..a47b69a1f2 100644 --- a/native/fs-hdfs/Cargo.toml +++ b/native/fs-hdfs/Cargo.toml @@ -32,7 +32,7 @@ publish = false readme = "README.md" [lib] -name = "hdfs" +name = "fs_hdfs" path = "src/lib.rs" [features] diff --git a/native/hdfs/Cargo.toml b/native/hdfs/Cargo.toml index dc8f970ef7..c66d71c203 100644 --- a/native/hdfs/Cargo.toml +++ b/native/hdfs/Cargo.toml @@ -33,7 +33,7 @@ edition = { workspace = true } [features] default = ["hdfs", "try_spawn_blocking"] -hdfs = ["fs-hdfs"] +hdfs = ["fs_hdfs"] hdfs3 = ["fs-hdfs3"] # Used for trying to spawn a blocking thread for implementing each object store interface when running in a tokio runtime try_spawn_blocking = [] @@ -42,7 +42,7 @@ try_spawn_blocking = [] async-trait = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } -fs-hdfs = { version = "^0.1.12", optional = true } +fs_hdfs = { package = "datafusion-comet-fs-hdfs3", path = "../fs-hdfs", optional = true } fs-hdfs3 = { version = "^0.1.12", optional = true } futures = { workspace = true } object_store = { workspace = true } diff --git a/native/hdfs/src/object_store/hdfs.rs b/native/hdfs/src/object_store/hdfs.rs index b49e879429..a93774cffe 100644 --- a/native/hdfs/src/object_store/hdfs.rs +++ b/native/hdfs/src/object_store/hdfs.rs @@ -26,9 +26,9 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; +use fs_hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs}; +use fs_hdfs::walkdir::HdfsWalkDir; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs}; -use hdfs::walkdir::HdfsWalkDir; use object_store::{ path::{self, Path}, Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload, @@ -422,7 +422,7 @@ impl ObjectStore for HadoopFileSystem { hdfs.delete(&to, false).map_err(to_error)?; } - hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to) + fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to) .map_err(to_error)?; Ok(()) @@ -437,7 +437,7 @@ impl ObjectStore for HadoopFileSystem { let to = HadoopFileSystem::path_to_filesystem(to); maybe_spawn_blocking(move || { - hdfs.rename(&from, &to).map_err(to_error)?; + hdfs.rename(&from, &to, true).map_err(to_error)?; Ok(()) }) @@ -459,7 +459,7 @@ impl ObjectStore for HadoopFileSystem { }); } - hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to) + fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to) .map_err(to_error)?; Ok(()) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 7fdf055217..8349329841 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -52,8 +52,9 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec case cmd: InsertIntoHadoopFsRelationCommand => cmd.fileFormat match { case _: ParquetFileFormat => - if (!cmd.outputPath.toString.startsWith("file:")) { - return Unsupported(Some("Only local filesystem output paths are supported")) + if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString + .startsWith("hdfs:")) { + return Unsupported(Some("Only HDFS/local filesystems output paths are supported")) } if (cmd.bucketSpec.isDefined) {