From 930f416541272c4c3947e7a6036b33d0c5ea56f2 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 17 Dec 2025 11:46:14 -0800 Subject: [PATCH 01/11] WIP: Add support for remote Parquet writer with openDAL --- .../src/execution/operators/parquet_writer.rs | 144 ++++++++++++++++-- native/core/src/parquet/parquet_support.rs | 63 +++++++- 2 files changed, 191 insertions(+), 16 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 57246abf7f..6417ba1560 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -22,9 +22,13 @@ use std::{ fmt, fmt::{Debug, Formatter}, fs::File, + io::Cursor, sync::Arc, }; +use bytes::Bytes; +use url::Url; + use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -48,6 +52,71 @@ use parquet::{ }; use crate::execution::shuffle::CompressionCodec; +use crate::parquet::parquet_support::write_to_hdfs_with_opendal_async; + +/// Enum representing different types of Arrow writers based on storage backend +enum ParquetWriter { + /// Writer for local file system + LocalFile(ArrowWriter), + /// Writer for HDFS or other remote storage (writes to in-memory buffer) + /// Contains the writer and the destination HDFS path + Remote(ArrowWriter>>, String), +} + +impl ParquetWriter { + /// Write a RecordBatch to the underlying writer + async fn write(&mut self, batch: &RecordBatch) -> std::result::Result<(), parquet::errors::ParquetError> { + match self { + ParquetWriter::LocalFile(writer) => writer.write(batch), + ParquetWriter::Remote(writer, output_path) => { + // Write batch to in-memory buffer + writer.write(batch)?; + + // Flush and get the current buffer content + writer.flush()?; + let cursor = writer.inner_mut(); + let buffer = cursor.get_ref().clone(); + + // Upload + let url = Url::parse(output_path).map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to parse URL '{}': {}", + output_path, e + )) + })?; + + write_to_hdfs_with_opendal_async(&url, Bytes::from(buffer)) + .await + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to upload to '{}': {}", + output_path, e + )) + })?; + + // Clear the buffer after upload + cursor.get_mut().clear(); + cursor.set_position(0); + + Ok(()) + }, + } + } + + /// Close the writer and return the buffer for remote writers + fn close(self) -> std::result::Result, String)>, parquet::errors::ParquetError> { + match self { + ParquetWriter::LocalFile(writer) => { + writer.close()?; + Ok(None) + } + ParquetWriter::Remote(writer, path) => { + let cursor = writer.into_inner()?; + Ok(Some((cursor.into_inner(), path))) + } + } + } +} /// Parquet writer operator that writes input batches to a Parquet file #[derive(Debug)] @@ -119,6 +188,59 @@ impl ParquetWriterExec { CompressionCodec::Snappy => Ok(Compression::SNAPPY), } } + + /// Create an Arrow writer based on the storage scheme + /// + /// # Arguments + /// * `storage_scheme` - The storage backend ("hdfs", "s3", or "local") + /// * `output_file_path` - The full path to the output file + /// * `schema` - The Arrow schema for the Parquet file + /// * `props` - Writer properties including compression + /// + /// # Returns + /// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme + /// * `Err(DataFusionError)` - If writer creation fails + fn create_arrow_writer( + storage_scheme: &str, + output_file_path: &str, + schema: SchemaRef, + props: WriterProperties, + ) -> Result { + match storage_scheme { + "hdfs" | "s3" => { + // For remote storage (HDFS, S3), write to an in-memory buffer + let buffer = Vec::new(); + let cursor = Cursor::new(buffer); + let writer = ArrowWriter::try_new(cursor, schema, Some(props)) + .map_err(|e| DataFusionError::Execution(format!( + "Failed to create {} writer: {}", storage_scheme, e + )))?; + Ok(ParquetWriter::Remote(writer, output_file_path.to_string())) + } + "local" => { + // For local file system, write directly to file + // Strip file:// or file: prefix if present + let local_path = output_file_path + .strip_prefix("file://") + .or_else(|| output_file_path.strip_prefix("file:")) + .unwrap_or(output_file_path); + + let file = File::create(local_path).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create output file '{}': {}", + local_path, e + )) + })?; + + let writer = ArrowWriter::try_new(file, schema, Some(props)) + .map_err(|e| DataFusionError::Execution(format!("Failed to create local file writer: {}", e)))?; + Ok(ParquetWriter::LocalFile(writer)) + } + _ => Err(DataFusionError::Execution(format!( + "Unsupported storage scheme: {}", storage_scheme + ))), + } + } } impl DisplayAs for ParquetWriterExec { @@ -217,6 +339,15 @@ impl ExecutionPlan for ParquetWriterExec { .collect(); let output_schema = Arc::new(arrow::datatypes::Schema::new(fields)); + // Determine storage scheme from work_dir + let storage_scheme = if work_dir.starts_with("hdfs://") { + "hdfs" + } else if work_dir.starts_with("s3://") || work_dir.starts_with("s3a://") { + "s3" + } else { + "local" + }; + // Strip file:// or file: prefix if present let local_path = work_dir .strip_prefix("file://") @@ -243,21 +374,12 @@ impl ExecutionPlan for ParquetWriterExec { format!("{}/part-{:05}.parquet", local_path, self.partition_id) }; - // Create the Parquet file - let file = File::create(&part_file).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output file '{}': {}", - part_file, e - )) - })?; - // Configure writer properties let props = WriterProperties::builder() .set_compression(compression) .build(); - let mut writer = ArrowWriter::try_new(file, Arc::clone(&output_schema), Some(props)) - .map_err(|e| DataFusionError::Execution(format!("Failed to create writer: {}", e)))?; + let mut writer = Self::create_arrow_writer(storage_scheme, &part_file, Arc::clone(&output_schema), props)?; // Clone schema for use in async closure let schema_for_write = Arc::clone(&output_schema); @@ -286,7 +408,7 @@ impl ExecutionPlan for ParquetWriterExec { batch }; - writer.write(&renamed_batch).map_err(|e| { + writer.write(&renamed_batch).await.map_err(|e| { DataFusionError::Execution(format!("Failed to write batch: {}", e)) })?; } diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 0b5c45d24d..9b3e9c3c5a 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -369,9 +369,11 @@ fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap) -> } } -// Mirrors object_store::parse::parse_url for the hdfs object store +// Creates an HDFS object store from a URL using the native HDFS implementation #[cfg(feature = "hdfs")] -fn parse_hdfs_url(url: &Url) -> Result<(Box, Path), object_store::Error> { +fn create_hdfs_object_store( + url: &Url, +) -> Result<(Box, Path), object_store::Error> { match datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref()) { Some(object_store) => { @@ -385,8 +387,11 @@ fn parse_hdfs_url(url: &Url) -> Result<(Box, Path), object_stor } } +// Creates an HDFS object store from a URL using OpenDAL #[cfg(feature = "hdfs-opendal")] -fn parse_hdfs_url(url: &Url) -> Result<(Box, Path), object_store::Error> { +fn create_hdfs_object_store( + url: &Url, +) -> Result<(Box, Path), object_store::Error> { let name_node = get_name_node_uri(url)?; let builder = opendal::services::Hdfs::default().name_node(&name_node); @@ -401,6 +406,51 @@ fn parse_hdfs_url(url: &Url) -> Result<(Box, Path), object_stor Ok((Box::new(store), path)) } +/// Writes data to HDFS using OpenDAL via ObjectStore trait (asynchronous version) +/// +/// # Arguments +/// * `url` - The HDFS URL (e.g., hdfs://namenode:port/path/to/file) +/// * `data` - The bytes to write to the file +/// +/// # Returns +/// * `Ok(())` on success +/// * `Err(object_store::Error)` on failure +/// +/// # Example +/// ```ignore +/// use url::Url; +/// use bytes::Bytes; +/// +/// let url = Url::parse("hdfs://namenode:9000/path/to/file.parquet")?; +/// let data = Bytes::from("file contents"); +/// write_to_hdfs_with_opendal_async(&url, data).await?; +/// ``` +#[cfg(feature = "hdfs-opendal")] +pub async fn write_to_hdfs_with_opendal_async( + url: &Url, + data: bytes::Bytes, +) -> Result<(), object_store::Error> { + // Create the HDFS object store using OpenDAL + let (object_store, path) = create_hdfs_object_store(url)?; + + // Use the ObjectStore trait's put method to write the data + object_store.put(&path, data.into()).await?; + + Ok(()) +} + +/// Stub implementation when hdfs-opendal feature is not enabled +#[cfg(not(feature = "hdfs-opendal"))] +pub async fn write_to_hdfs_with_opendal_async( + _url: &Url, + _data: bytes::Bytes, +) -> Result<(), object_store::Error> { + Err(object_store::Error::Generic { + store: "hdfs-opendal", + source: "HDFS OpenDAL support is not enabled in this build".into(), + }) +} + #[cfg(feature = "hdfs-opendal")] fn get_name_node_uri(url: &Url) -> Result { use std::fmt::Write; @@ -422,8 +472,11 @@ fn get_name_node_uri(url: &Url) -> Result { } } +// Stub implementation when HDFS support is not enabled #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] -fn parse_hdfs_url(_url: &Url) -> Result<(Box, Path), object_store::Error> { +fn create_hdfs_object_store( + _url: &Url, +) -> Result<(Box, Path), object_store::Error> { Err(object_store::Error::Generic { store: "HadoopFileSystem", source: "Hdfs support is not enabled in this build".into(), @@ -454,7 +507,7 @@ pub(crate) fn prepare_object_store_with_configs( ); let (object_store, object_store_path): (Box, Path) = if is_hdfs_scheme { - parse_hdfs_url(&url) + create_hdfs_object_store(&url) } else if scheme == "s3" { objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300)) } else { From 30eb5819d140233c6ada99cb9ed5e4ae2ba55875 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 18 Dec 2025 09:20:56 -0800 Subject: [PATCH 02/11] WIP: Add support for remote Parquet writer with openDAL --- .../src/execution/operators/parquet_writer.rs | 105 ++++++++++++++++-- native/core/src/parquet/parquet_support.rs | 101 ++++++++++++----- .../operator/CometDataWritingCommand.scala | 5 +- 3 files changed, 170 insertions(+), 41 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 6417ba1560..da97d89f5f 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -54,6 +54,9 @@ use parquet::{ use crate::execution::shuffle::CompressionCodec; use crate::parquet::parquet_support::write_to_hdfs_with_opendal_async; +#[cfg(all(test, feature = "hdfs-opendal"))] +use crate::parquet::parquet_support::write_record_batch_to_hdfs; + /// Enum representing different types of Arrow writers based on storage backend enum ParquetWriter { /// Writer for local file system @@ -65,7 +68,10 @@ enum ParquetWriter { impl ParquetWriter { /// Write a RecordBatch to the underlying writer - async fn write(&mut self, batch: &RecordBatch) -> std::result::Result<(), parquet::errors::ParquetError> { + async fn write( + &mut self, + batch: &RecordBatch, + ) -> std::result::Result<(), parquet::errors::ParquetError> { match self { ParquetWriter::LocalFile(writer) => writer.write(batch), ParquetWriter::Remote(writer, output_path) => { @@ -84,7 +90,7 @@ impl ParquetWriter { output_path, e )) })?; - + write_to_hdfs_with_opendal_async(&url, Bytes::from(buffer)) .await .map_err(|e| { @@ -99,12 +105,14 @@ impl ParquetWriter { cursor.set_position(0); Ok(()) - }, + } } } /// Close the writer and return the buffer for remote writers - fn close(self) -> std::result::Result, String)>, parquet::errors::ParquetError> { + fn close( + self, + ) -> std::result::Result, String)>, parquet::errors::ParquetError> { match self { ParquetWriter::LocalFile(writer) => { writer.close()?; @@ -211,10 +219,12 @@ impl ParquetWriterExec { // For remote storage (HDFS, S3), write to an in-memory buffer let buffer = Vec::new(); let cursor = Cursor::new(buffer); - let writer = ArrowWriter::try_new(cursor, schema, Some(props)) - .map_err(|e| DataFusionError::Execution(format!( - "Failed to create {} writer: {}", storage_scheme, e - )))?; + let writer = ArrowWriter::try_new(cursor, schema, Some(props)).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create {} writer: {}", + storage_scheme, e + )) + })?; Ok(ParquetWriter::Remote(writer, output_file_path.to_string())) } "local" => { @@ -232,12 +242,14 @@ impl ParquetWriterExec { )) })?; - let writer = ArrowWriter::try_new(file, schema, Some(props)) - .map_err(|e| DataFusionError::Execution(format!("Failed to create local file writer: {}", e)))?; + let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| { + DataFusionError::Execution(format!("Failed to create local file writer: {}", e)) + })?; Ok(ParquetWriter::LocalFile(writer)) } _ => Err(DataFusionError::Execution(format!( - "Unsupported storage scheme: {}", storage_scheme + "Unsupported storage scheme: {}", + storage_scheme ))), } } @@ -379,7 +391,12 @@ impl ExecutionPlan for ParquetWriterExec { .set_compression(compression) .build(); - let mut writer = Self::create_arrow_writer(storage_scheme, &part_file, Arc::clone(&output_schema), props)?; + let mut writer = Self::create_arrow_writer( + storage_scheme, + &part_file, + Arc::clone(&output_schema), + props, + )?; // Clone schema for use in async closure let schema_for_write = Arc::clone(&output_schema); @@ -444,3 +461,67 @@ impl ExecutionPlan for ParquetWriterExec { ))) } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int32Array, StringArray}; + use arrow::datatypes::{DataType, Field, Schema}; + use std::sync::Arc; + + /// Helper function to create a test RecordBatch with 1000 rows of (int, string) data + fn create_test_record_batch() -> Result { + let num_rows = 1000; + + // Create int column with values 0..1000 + let int_array = Int32Array::from_iter_values(0..num_rows); + + // Create string column with values "value_0", "value_1", ..., "value_999" + let string_values: Vec = (0..num_rows) + .map(|i| format!("value_{}", i)) + .collect(); + let string_array = StringArray::from(string_values); + + // Define schema + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + // Create RecordBatch + RecordBatch::try_new( + schema, + vec![Arc::new(int_array), Arc::new(string_array)], + ) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + } + + #[tokio::test] + //#[cfg(feature = "hdfs-opendal")] + async fn test_write_to_hdfs() -> Result<()> { + use opendal::services::Hdfs; + use opendal::Operator; + + // Create test data + let batch = create_test_record_batch()?; + + // Configure HDFS connection + let namenode = "hdfs://namenode:9000"; + let output_path = "/user/test_write/data.parquet"; + + // Create OpenDAL HDFS operator + let builder = Hdfs::default().name_node(namenode); + let op = Operator::new(builder) + .map_err(|e| DataFusionError::Execution(format!("Failed to create HDFS operator: {}", e)))? + .finish(); + + // Write the batch using write_record_batch_to_hdfs + write_record_batch_to_hdfs(&op, output_path, batch) + .await + .map_err(|e| DataFusionError::Execution(format!("Failed to write to HDFS: {}", e)))?; + + println!("Successfully wrote 1000 rows to HDFS at {}{}", namenode, output_path); + + Ok(()) + } +} diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 9b3e9c3c5a..2bffb11e7b 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -406,35 +406,82 @@ fn create_hdfs_object_store( Ok((Box::new(store), path)) } -/// Writes data to HDFS using OpenDAL via ObjectStore trait (asynchronous version) -/// -/// # Arguments -/// * `url` - The HDFS URL (e.g., hdfs://namenode:port/path/to/file) -/// * `data` - The bytes to write to the file -/// -/// # Returns -/// * `Ok(())` on success -/// * `Err(object_store::Error)` on failure -/// -/// # Example -/// ```ignore -/// use url::Url; -/// use bytes::Bytes; -/// -/// let url = Url::parse("hdfs://namenode:9000/path/to/file.parquet")?; -/// let data = Bytes::from("file contents"); -/// write_to_hdfs_with_opendal_async(&url, data).await?; -/// ``` +use tokio::sync::mpsc; + +struct ChannelWriter { + sender: mpsc::Sender>, +} + +impl std::io::Write for ChannelWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.sender + .blocking_send(buf.to_vec()) + .map_err(|_| { + std::io::Error::new(std::io::ErrorKind::BrokenPipe, "channel closed") + })?; + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + #[cfg(feature = "hdfs-opendal")] -pub async fn write_to_hdfs_with_opendal_async( - url: &Url, - data: bytes::Bytes, -) -> Result<(), object_store::Error> { - // Create the HDFS object store using OpenDAL - let (object_store, path) = create_hdfs_object_store(url)?; +use opendal::Operator; +use parquet::arrow::ArrowWriter; +use arrow::record_batch::RecordBatch; +use parquet::file::properties::WriterProperties; + +#[cfg(feature = "hdfs-opendal")] +pub async fn write_record_batch_to_hdfs( + op: &Operator, + path: &str, + batch: RecordBatch, +) -> Result<(), opendal::Error> { + // ------------------------------------------------------------ + // 1. Open async HDFS writer + // ------------------------------------------------------------ + let mut hdfs_writer = op.writer(path).await?; + + // ------------------------------------------------------------ + // 2. Channel between blocking and async worlds + // ------------------------------------------------------------ + let (tx, mut rx) = mpsc::channel::>(8); + + let schema = batch.schema(); + + // ------------------------------------------------------------ + // 3. Blocking Parquet writer + // ------------------------------------------------------------ + let parquet_task = tokio::task::spawn_blocking(move || -> Result<(), Box> { + let props = WriterProperties::builder().build(); + let channel_writer = ChannelWriter { sender: tx }; + + let mut writer = ArrowWriter::try_new( + channel_writer, + schema, + Some(props), + )?; + + writer.write(&batch)?; + writer.close()?; // important to flush remaining data + + Ok(()) + }); + + // ------------------------------------------------------------ + // 4. Async HDFS consumer + // ------------------------------------------------------------ + while let Some(chunk) = rx.recv().await { + hdfs_writer.write(chunk).await?; + } - // Use the ObjectStore trait's put method to write the data - object_store.put(&path, data.into()).await?; + hdfs_writer.close().await?; + parquet_task + .await + .map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, &format!("task join failed: {}", e)))? + .map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, &format!("parquet write failed: {}", e)))?; Ok(()) } diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 7fdf055217..8349329841 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -52,8 +52,9 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec case cmd: InsertIntoHadoopFsRelationCommand => cmd.fileFormat match { case _: ParquetFileFormat => - if (!cmd.outputPath.toString.startsWith("file:")) { - return Unsupported(Some("Only local filesystem output paths are supported")) + if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString + .startsWith("hdfs:")) { + return Unsupported(Some("Only HDFS/local filesystems output paths are supported")) } if (cmd.bucketSpec.isDefined) { From 89e5c6ac6364a6f05fb5eac8459e22141d18e50d Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 18 Dec 2025 15:49:37 -0800 Subject: [PATCH 03/11] WIP: Add support for remote Parquet writer with openDAL --- native/core/Cargo.toml | 2 +- .../src/execution/operators/parquet_writer.rs | 335 ++++++++++++++---- native/core/src/parquet/parquet_support.rs | 103 +++--- 3 files changed, 319 insertions(+), 121 deletions(-) diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index c79d603095..1a409464d3 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -99,7 +99,7 @@ datafusion-functions-nested = { version = "51.0.0" } [features] backtrace = ["datafusion/backtrace"] -default = [] +default = ["hdfs-opendal"] hdfs = ["datafusion-comet-objectstore-hdfs"] hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"] jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index da97d89f5f..4cc1c8b016 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -27,6 +27,7 @@ use std::{ }; use bytes::Bytes; +use opendal::{services::Hdfs, Operator}; use url::Url; use arrow::datatypes::{Schema, SchemaRef}; @@ -52,7 +53,6 @@ use parquet::{ }; use crate::execution::shuffle::CompressionCodec; -use crate::parquet::parquet_support::write_to_hdfs_with_opendal_async; #[cfg(all(test, feature = "hdfs-opendal"))] use crate::parquet::parquet_support::write_record_batch_to_hdfs; @@ -62,8 +62,9 @@ enum ParquetWriter { /// Writer for local file system LocalFile(ArrowWriter), /// Writer for HDFS or other remote storage (writes to in-memory buffer) - /// Contains the writer and the destination HDFS path - Remote(ArrowWriter>>, String), + /// Contains the arrow writer, HDFS operator, and destination path + /// The opendal::Writer is created lazily on first write + Remote(ArrowWriter>>, Option, Operator, String), } impl ParquetWriter { @@ -74,31 +75,33 @@ impl ParquetWriter { ) -> std::result::Result<(), parquet::errors::ParquetError> { match self { ParquetWriter::LocalFile(writer) => writer.write(batch), - ParquetWriter::Remote(writer, output_path) => { + ParquetWriter::Remote(arrow_parquet_buffer_writer, hdfs_writer_opt, op, output_path) => { // Write batch to in-memory buffer - writer.write(batch)?; + arrow_parquet_buffer_writer.write(batch)?; // Flush and get the current buffer content - writer.flush()?; - let cursor = writer.inner_mut(); - let buffer = cursor.get_ref().clone(); - - // Upload - let url = Url::parse(output_path).map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to parse URL '{}': {}", - output_path, e - )) - })?; - - write_to_hdfs_with_opendal_async(&url, Bytes::from(buffer)) - .await - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to upload to '{}': {}", - output_path, e - )) + arrow_parquet_buffer_writer.flush()?; + let cursor = arrow_parquet_buffer_writer.inner_mut(); + let current_data = cursor.get_ref().clone(); + + // Create HDFS writer lazily on first write + if hdfs_writer_opt.is_none() { + let writer = op.writer(output_path.as_str()).await.map_err(|e| { + parquet::errors::ParquetError::External( + format!("Failed to create HDFS writer: {}", e).into() + ) + })?; + *hdfs_writer_opt = Some(writer); + } + + // Write the accumulated data to HDFS + if let Some(hdfs_writer) = hdfs_writer_opt { + hdfs_writer.write(current_data).await.map_err(|e| { + parquet::errors::ParquetError::External( + format!("Failed to write batch to HDFS: {}", e).into() + ) })?; + } // Clear the buffer after upload cursor.get_mut().clear(); @@ -109,18 +112,49 @@ impl ParquetWriter { } } - /// Close the writer and return the buffer for remote writers - fn close( + /// Close the writer and finalize the file + async fn close( self, - ) -> std::result::Result, String)>, parquet::errors::ParquetError> { + ) -> std::result::Result<(), parquet::errors::ParquetError> { match self { ParquetWriter::LocalFile(writer) => { writer.close()?; - Ok(None) + Ok(()) } - ParquetWriter::Remote(writer, path) => { - let cursor = writer.into_inner()?; - Ok(Some((cursor.into_inner(), path))) + ParquetWriter::Remote(arrow_parquet_buffer_writer, mut hdfs_writer_opt, op, output_path) => { + // Close the arrow writer to finalize parquet format + let cursor = arrow_parquet_buffer_writer.into_inner()?; + let final_data = cursor.into_inner(); + + // Create HDFS writer if not already created + if hdfs_writer_opt.is_none() && !final_data.is_empty() { + let writer = op.writer(output_path.as_str()).await.map_err(|e| { + parquet::errors::ParquetError::External( + format!("Failed to create HDFS writer: {}", e).into() + ) + })?; + hdfs_writer_opt = Some(writer); + } + + // Write any remaining data + if !final_data.is_empty() { + if let Some(mut hdfs_writer) = hdfs_writer_opt { + hdfs_writer.write(final_data).await.map_err(|e| { + parquet::errors::ParquetError::External( + format!("Failed to write final data to HDFS: {}", e).into() + ) + })?; + + // Close the HDFS writer + hdfs_writer.close().await.map_err(|e| { + parquet::errors::ParquetError::External( + format!("Failed to close HDFS writer: {}", e).into() + ) + })?; + } + } + + Ok(()) } } } @@ -219,13 +253,28 @@ impl ParquetWriterExec { // For remote storage (HDFS, S3), write to an in-memory buffer let buffer = Vec::new(); let cursor = Cursor::new(buffer); - let writer = ArrowWriter::try_new(cursor, schema, Some(props)).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create {} writer: {}", - storage_scheme, e - )) - })?; - Ok(ParquetWriter::Remote(writer, output_file_path.to_string())) + let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props)) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create {} writer: {}", + storage_scheme, e + )) + })?; + + let builder = Hdfs::default().name_node("hdfs://namenode:9000"); + let op = Operator::new(builder) + .map_err(|e| { + DataFusionError::Execution(format!("Failed to create HDFS operator: {}", e)) + })? + .finish(); + + // HDFS writer will be created lazily on first write + Ok(ParquetWriter::Remote( + arrow_parquet_buffer_writer, + None, + op, + output_file_path.to_string(), + )) } "local" => { // For local file system, write directly to file @@ -430,7 +479,7 @@ impl ExecutionPlan for ParquetWriterExec { })?; } - writer.close().map_err(|e| { + writer.close().await.map_err(|e| { DataFusionError::Execution(format!("Failed to close writer: {}", e)) })?; @@ -467,61 +516,205 @@ mod tests { use super::*; use arrow::array::{Int32Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch; use std::sync::Arc; /// Helper function to create a test RecordBatch with 1000 rows of (int, string) data - fn create_test_record_batch() -> Result { - let num_rows = 1000; - - // Create int column with values 0..1000 - let int_array = Int32Array::from_iter_values(0..num_rows); - - // Create string column with values "value_0", "value_1", ..., "value_999" - let string_values: Vec = (0..num_rows) + fn create_test_record_batch(batch_id: i32) -> Result { + assert!(batch_id > 0, "batch_id must be greater than 0"); + let num_rows = batch_id * 1000; + + let int_array = Int32Array::from_iter_values(((batch_id - 1) * 1000)..num_rows); + + let string_values: Vec = (((batch_id - 1) * 1000)..num_rows) .map(|i| format!("value_{}", i)) .collect(); let string_array = StringArray::from(string_values); - + // Define schema let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), ])); - + // Create RecordBatch - RecordBatch::try_new( - schema, - vec![Arc::new(int_array), Arc::new(string_array)], - ) - .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + RecordBatch::try_new(schema, vec![Arc::new(int_array), Arc::new(string_array)]) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) } #[tokio::test] - //#[cfg(feature = "hdfs-opendal")] - async fn test_write_to_hdfs() -> Result<()> { + #[cfg(feature = "hdfs-opendal")] + async fn test_write_to_hdfs_sync() -> Result<()> { use opendal::services::Hdfs; use opendal::Operator; - - // Create test data - let batch = create_test_record_batch()?; - + // Configure HDFS connection let namenode = "hdfs://namenode:9000"; let output_path = "/user/test_write/data.parquet"; - + + // Create OpenDAL HDFS operator + let builder = Hdfs::default().name_node(namenode); + let op = Operator::new(builder) + .map_err(|e| { + DataFusionError::Execution(format!("Failed to create HDFS operator: {}", e)) + })? + .finish(); + + let mut hdfs_writer = op.writer(output_path).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e)) + })?; + + let mut buffer = Cursor::new(Vec::new()); + let mut writer = + ArrowWriter::try_new(&mut buffer, create_test_record_batch(1)?.schema(), None)?; + + for i in 1..=5 { + let record_batch = create_test_record_batch(i)?; + + writer.write(&record_batch)?; + + println!( + "Successfully wrote 1000 rows to HDFS at {}{}", + namenode, output_path + ); + } + + writer.close()?; + + hdfs_writer.write(buffer.into_inner()).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write with HDFS writer: {}", e)) + })?; + + hdfs_writer.close().await.map_err(|e| { + DataFusionError::Execution(format!("Failed to close HDFS writer: {}", e)) + })?; + + Ok(()) + } + + #[tokio::test] + #[cfg(feature = "hdfs-opendal")] + async fn test_write_to_hdfs_streaming() -> Result<()> { + use opendal::services::Hdfs; + use opendal::Operator; + + // Configure HDFS connection + let namenode = "hdfs://namenode:9000"; + let output_path = "/user/test_write_streaming/data.parquet"; + // Create OpenDAL HDFS operator let builder = Hdfs::default().name_node(namenode); let op = Operator::new(builder) - .map_err(|e| DataFusionError::Execution(format!("Failed to create HDFS operator: {}", e)))? + .map_err(|e| { + DataFusionError::Execution(format!("Failed to create HDFS operator: {}", e)) + })? .finish(); - - // Write the batch using write_record_batch_to_hdfs - write_record_batch_to_hdfs(&op, output_path, batch) - .await - .map_err(|e| DataFusionError::Execution(format!("Failed to write to HDFS: {}", e)))?; - - println!("Successfully wrote 1000 rows to HDFS at {}{}", namenode, output_path); - + + // Create a single HDFS writer for the entire file + let mut hdfs_writer = op.writer(output_path).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e)) + })?; + + // Create a single ArrowWriter that will be used for all batches + let buffer = Cursor::new(Vec::new()); + let mut writer = ArrowWriter::try_new(buffer, create_test_record_batch(1)?.schema(), None)?; + + // Write each batch and upload to HDFS immediately (streaming approach) + for i in 1..=5 { + let record_batch = create_test_record_batch(i)?; + + // Write the batch to the parquet writer + writer.write(&record_batch)?; + + // Flush the writer to ensure data is written to the buffer + writer.flush()?; + + // Get the current buffer content through the writer + let cursor = writer.inner_mut(); + let current_data = cursor.get_ref().clone(); + + // Write the accumulated data to HDFS + hdfs_writer.write(current_data).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write batch {} to HDFS: {}", i, e)) + })?; + + // Clear the buffer for the next iteration + cursor.get_mut().clear(); + cursor.set_position(0); + + println!( + "Successfully streamed batch {} (1000 rows) to HDFS at {}{}", + i, namenode, output_path + ); + } + + // Close the ArrowWriter to finalize the parquet file + let cursor = writer.into_inner()?; + + // Write any remaining data from closing the writer + let final_data = cursor.into_inner(); + if !final_data.is_empty() { + hdfs_writer.write(final_data).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write final data to HDFS: {}", e)) + })?; + } + + // Close the HDFS writer + hdfs_writer.close().await.map_err(|e| { + DataFusionError::Execution(format!("Failed to close HDFS writer: {}", e)) + })?; + + println!( + "Successfully completed streaming write of 5 batches (5000 total rows) to HDFS at {}{}", + namenode, output_path + ); + + Ok(()) + } + + #[tokio::test] + #[cfg(feature = "hdfs-opendal")] + async fn test_parquet_writer_streaming() -> Result<()> { + // Configure output path + let output_path = "/user/test_parquet_writer_streaming/data.parquet"; + + // Configure writer properties + let props = WriterProperties::builder() + .set_compression(Compression::UNCOMPRESSED) + .build(); + + // Create ParquetWriter using the create_arrow_writer method + let mut writer = ParquetWriterExec::create_arrow_writer( + "hdfs", + output_path, + create_test_record_batch(1)?.schema(), + props, + )?; + + // Write 5 batches in a loop + for i in 1..=5 { + let record_batch = create_test_record_batch(i)?; + + writer.write(&record_batch).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write batch {}: {}", i, e)) + })?; + + println!( + "Successfully wrote batch {} (1000 rows) using ParquetWriter", + i + ); + } + + // Close the writer + writer.close().await.map_err(|e| { + DataFusionError::Execution(format!("Failed to close writer: {}", e)) + })?; + + println!( + "Successfully completed ParquetWriter streaming write of 5 batches (5000 total rows) to HDFS at {}", + output_path + ); + Ok(()) - } + } } diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 2bffb11e7b..cb062d18e7 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -29,6 +29,7 @@ use arrow::{ datatypes::{DataType, TimeUnit}, util::display::FormatOptions, }; +use bytes::BytesMut; use datafusion::common::{Result as DataFusionResult, ScalarValue}; use datafusion::error::DataFusionError; use datafusion::execution::object_store::ObjectStoreUrl; @@ -38,6 +39,7 @@ use datafusion_comet_spark_expr::EvalMode; use object_store::path::Path; use object_store::{parse_url, ObjectStore}; use std::collections::HashMap; +use std::io::Cursor; use std::time::Duration; use std::{fmt::Debug, hash::Hash, sync::Arc}; use url::Url; @@ -416,9 +418,7 @@ impl std::io::Write for ChannelWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.sender .blocking_send(buf.to_vec()) - .map_err(|_| { - std::io::Error::new(std::io::ErrorKind::BrokenPipe, "channel closed") - })?; + .map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "channel closed"))?; Ok(buf.len()) } @@ -427,61 +427,66 @@ impl std::io::Write for ChannelWriter { } } -#[cfg(feature = "hdfs-opendal")] -use opendal::Operator; -use parquet::arrow::ArrowWriter; +//#[cfg(feature = "hdfs-opendal")] use arrow::record_batch::RecordBatch; +use opendal::Writer; +use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; -#[cfg(feature = "hdfs-opendal")] +//#[cfg(feature = "hdfs-opendal")] pub async fn write_record_batch_to_hdfs( - op: &Operator, - path: &str, + hdfs_writer: &mut Writer, batch: RecordBatch, ) -> Result<(), opendal::Error> { - // ------------------------------------------------------------ - // 1. Open async HDFS writer - // ------------------------------------------------------------ - let mut hdfs_writer = op.writer(path).await?; - // ------------------------------------------------------------ // 2. Channel between blocking and async worlds // ------------------------------------------------------------ - let (tx, mut rx) = mpsc::channel::>(8); - - let schema = batch.schema(); - - // ------------------------------------------------------------ - // 3. Blocking Parquet writer - // ------------------------------------------------------------ - let parquet_task = tokio::task::spawn_blocking(move || -> Result<(), Box> { - let props = WriterProperties::builder().build(); - let channel_writer = ChannelWriter { sender: tx }; - - let mut writer = ArrowWriter::try_new( - channel_writer, - schema, - Some(props), - )?; - - writer.write(&batch)?; - writer.close()?; // important to flush remaining data - - Ok(()) - }); - - // ------------------------------------------------------------ - // 4. Async HDFS consumer - // ------------------------------------------------------------ - while let Some(chunk) = rx.recv().await { - hdfs_writer.write(chunk).await?; - } - - hdfs_writer.close().await?; - parquet_task - .await - .map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, &format!("task join failed: {}", e)))? - .map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, &format!("parquet write failed: {}", e)))?; + //let (tx, mut rx) = mpsc::channel::>(8); + + //let schema = batch.schema(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); + writer.write(&batch).map_err(|e| { + opendal::Error::new( + opendal::ErrorKind::Unexpected, + &format!("write failed: {}", e), + ) + })?; + + writer.close().map_err(|e| { + opendal::Error::new( + opendal::ErrorKind::Unexpected, + &format!("Error closing writer: {}", e), + ) + })?; + hdfs_writer.write(buffer).await?; + + // let parquet_task = tokio::task::spawn_blocking(move || -> Result<(), Box> { + // let props = WriterProperties::builder().build(); + // let channel_writer = ChannelWriter { sender: tx }; + + // let mut writer = ArrowWriter::try_new( + // channel_writer, + // schema, + // Some(props), + // )?; + + // writer.write(&batch)?; + + // writer.close()?; // important to flush remaining data + + // Ok(()) + // }); + + // while let Some(chunk) = rx.recv().await { + // hdfs_writer.write(chunk).await?; + // } + + // parquet_task + // .await + // .map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, &format!("task join failed: {}", e)))? + // .map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, &format!("parquet write failed: {}", e)))?; Ok(()) } From c70d3c7bf099f26b01ed5319fa8713c8fdf83142 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 18 Dec 2025 17:42:30 -0800 Subject: [PATCH 04/11] [WIP] opendal writes --- .../src/execution/operators/parquet_writer.rs | 210 +++++++++++++----- native/core/src/parquet/parquet_support.rs | 85 ------- 2 files changed, 151 insertions(+), 144 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 4cc1c8b016..04879bf980 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -26,7 +26,6 @@ use std::{ sync::Arc, }; -use bytes::Bytes; use opendal::{services::Hdfs, Operator}; use url::Url; @@ -54,17 +53,20 @@ use parquet::{ use crate::execution::shuffle::CompressionCodec; -#[cfg(all(test, feature = "hdfs-opendal"))] -use crate::parquet::parquet_support::write_record_batch_to_hdfs; - /// Enum representing different types of Arrow writers based on storage backend enum ParquetWriter { /// Writer for local file system LocalFile(ArrowWriter), /// Writer for HDFS or other remote storage (writes to in-memory buffer) /// Contains the arrow writer, HDFS operator, and destination path + /// an Arrow writer writes to in-memory buffer the data converted to Parquet format /// The opendal::Writer is created lazily on first write - Remote(ArrowWriter>>, Option, Operator, String), + Remote( + ArrowWriter>>, + Option, + Operator, + String, + ), } impl ParquetWriter { @@ -75,7 +77,12 @@ impl ParquetWriter { ) -> std::result::Result<(), parquet::errors::ParquetError> { match self { ParquetWriter::LocalFile(writer) => writer.write(batch), - ParquetWriter::Remote(arrow_parquet_buffer_writer, hdfs_writer_opt, op, output_path) => { + ParquetWriter::Remote( + arrow_parquet_buffer_writer, + hdfs_writer_opt, + op, + output_path, + ) => { // Write batch to in-memory buffer arrow_parquet_buffer_writer.write(batch)?; @@ -88,7 +95,8 @@ impl ParquetWriter { if hdfs_writer_opt.is_none() { let writer = op.writer(output_path.as_str()).await.map_err(|e| { parquet::errors::ParquetError::External( - format!("Failed to create HDFS writer: {}", e).into() + format!("Failed to create HDFS writer for '{}': {}", output_path, e) + .into(), ) })?; *hdfs_writer_opt = Some(writer); @@ -98,7 +106,11 @@ impl ParquetWriter { if let Some(hdfs_writer) = hdfs_writer_opt { hdfs_writer.write(current_data).await.map_err(|e| { parquet::errors::ParquetError::External( - format!("Failed to write batch to HDFS: {}", e).into() + format!( + "Failed to write batch to HDFS file '{}': {}", + output_path, e + ) + .into(), ) })?; } @@ -113,15 +125,18 @@ impl ParquetWriter { } /// Close the writer and finalize the file - async fn close( - self, - ) -> std::result::Result<(), parquet::errors::ParquetError> { + async fn close(self) -> std::result::Result<(), parquet::errors::ParquetError> { match self { ParquetWriter::LocalFile(writer) => { writer.close()?; Ok(()) } - ParquetWriter::Remote(arrow_parquet_buffer_writer, mut hdfs_writer_opt, op, output_path) => { + ParquetWriter::Remote( + arrow_parquet_buffer_writer, + mut hdfs_writer_opt, + op, + output_path, + ) => { // Close the arrow writer to finalize parquet format let cursor = arrow_parquet_buffer_writer.into_inner()?; let final_data = cursor.into_inner(); @@ -130,7 +145,8 @@ impl ParquetWriter { if hdfs_writer_opt.is_none() && !final_data.is_empty() { let writer = op.writer(output_path.as_str()).await.map_err(|e| { parquet::errors::ParquetError::External( - format!("Failed to create HDFS writer: {}", e).into() + format!("Failed to create HDFS writer for '{}': {}", output_path, e) + .into(), ) })?; hdfs_writer_opt = Some(writer); @@ -141,14 +157,19 @@ impl ParquetWriter { if let Some(mut hdfs_writer) = hdfs_writer_opt { hdfs_writer.write(final_data).await.map_err(|e| { parquet::errors::ParquetError::External( - format!("Failed to write final data to HDFS: {}", e).into() + format!( + "Failed to write final data to HDFS file '{}': {}", + output_path, e + ) + .into(), ) })?; // Close the HDFS writer hdfs_writer.close().await.map_err(|e| { parquet::errors::ParquetError::External( - format!("Failed to close HDFS writer: {}", e).into() + format!("Failed to close HDFS writer for '{}': {}", output_path, e) + .into(), ) })?; } @@ -243,13 +264,43 @@ impl ParquetWriterExec { /// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme /// * `Err(DataFusionError)` - If writer creation fails fn create_arrow_writer( - storage_scheme: &str, output_file_path: &str, schema: SchemaRef, props: WriterProperties, ) -> Result { + // Determine storage scheme from output_file_path + let storage_scheme = if output_file_path.starts_with("hdfs://") { + "hdfs" + } else if output_file_path.starts_with("s3://") || output_file_path.starts_with("s3a://") { + "s3" + } else { + "local" + }; + match storage_scheme { - "hdfs" | "s3" => { + "hdfs" => { + // Parse the output_file_path to extract namenode and path + // Expected format: hdfs://namenode:port/path/to/file + let url = Url::parse(output_file_path).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to parse HDFS URL '{}': {}", + output_file_path, e + )) + })?; + + // Extract namenode (scheme + host + port) + let namenode = format!( + "{}://{}{}", + url.scheme(), + url.host_str().unwrap_or("localhost"), + url.port() + .map(|p| format!(":{}", p)) + .unwrap_or_else(|| ":9000".to_string()) + ); + + // Extract the path (without the scheme and host) + let hdfs_path = url.path().to_string(); + // For remote storage (HDFS, S3), write to an in-memory buffer let buffer = Vec::new(); let cursor = Cursor::new(buffer); @@ -261,19 +312,23 @@ impl ParquetWriterExec { )) })?; - let builder = Hdfs::default().name_node("hdfs://namenode:9000"); + let builder = Hdfs::default().name_node(&namenode); let op = Operator::new(builder) .map_err(|e| { - DataFusionError::Execution(format!("Failed to create HDFS operator: {}", e)) + DataFusionError::Execution(format!( + "Failed to create HDFS operator for '{}' (namenode: {}): {}", + output_file_path, namenode, e + )) })? .finish(); // HDFS writer will be created lazily on first write + // Use only the path part for the HDFS writer Ok(ParquetWriter::Remote( arrow_parquet_buffer_writer, None, op, - output_file_path.to_string(), + hdfs_path, )) } "local" => { @@ -284,6 +339,14 @@ impl ParquetWriterExec { .or_else(|| output_file_path.strip_prefix("file:")) .unwrap_or(output_file_path); + // Create output directory + std::fs::create_dir_all(&local_path).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create output directory '{}': {}", + local_path, e + )) + })?; + let file = File::create(local_path).map_err(|e| { DataFusionError::Execution(format!( "Failed to create output file '{}': {}", @@ -400,39 +463,15 @@ impl ExecutionPlan for ParquetWriterExec { .collect(); let output_schema = Arc::new(arrow::datatypes::Schema::new(fields)); - // Determine storage scheme from work_dir - let storage_scheme = if work_dir.starts_with("hdfs://") { - "hdfs" - } else if work_dir.starts_with("s3://") || work_dir.starts_with("s3a://") { - "s3" - } else { - "local" - }; - - // Strip file:// or file: prefix if present - let local_path = work_dir - .strip_prefix("file://") - .or_else(|| work_dir.strip_prefix("file:")) - .unwrap_or(&work_dir) - .to_string(); - - // Create output directory - std::fs::create_dir_all(&local_path).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output directory '{}': {}", - local_path, e - )) - })?; - // Generate part file name for this partition // If using FileCommitProtocol (work_dir is set), include task_attempt_id in the filename let part_file = if let Some(attempt_id) = task_attempt_id { format!( "{}/part-{:05}-{:05}.parquet", - local_path, self.partition_id, attempt_id + work_dir, self.partition_id, attempt_id ) } else { - format!("{}/part-{:05}.parquet", local_path, self.partition_id) + format!("{}/part-{:05}.parquet", work_dir, self.partition_id) }; // Configure writer properties @@ -440,12 +479,7 @@ impl ExecutionPlan for ParquetWriterExec { .set_compression(compression) .build(); - let mut writer = Self::create_arrow_writer( - storage_scheme, - &part_file, - Arc::clone(&output_schema), - props, - )?; + let mut writer = Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?; // Clone schema for use in async closure let schema_for_write = Arc::clone(&output_schema); @@ -516,7 +550,6 @@ mod tests { use super::*; use arrow::array::{Int32Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch; use std::sync::Arc; /// Helper function to create a test RecordBatch with 1000 rows of (int, string) data @@ -684,9 +717,10 @@ mod tests { .build(); // Create ParquetWriter using the create_arrow_writer method + // Use full HDFS URL format + let full_output_path = format!("hdfs://namenode:9000{}", output_path); let mut writer = ParquetWriterExec::create_arrow_writer( - "hdfs", - output_path, + &full_output_path, create_test_record_batch(1)?.schema(), props, )?; @@ -694,7 +728,7 @@ mod tests { // Write 5 batches in a loop for i in 1..=5 { let record_batch = create_test_record_batch(i)?; - + writer.write(&record_batch).await.map_err(|e| { DataFusionError::Execution(format!("Failed to write batch {}: {}", i, e)) })?; @@ -706,9 +740,10 @@ mod tests { } // Close the writer - writer.close().await.map_err(|e| { - DataFusionError::Execution(format!("Failed to close writer: {}", e)) - })?; + writer + .close() + .await + .map_err(|e| DataFusionError::Execution(format!("Failed to close writer: {}", e)))?; println!( "Successfully completed ParquetWriter streaming write of 5 batches (5000 total rows) to HDFS at {}", @@ -717,4 +752,61 @@ mod tests { Ok(()) } + + #[tokio::test] + #[cfg(feature = "hdfs-opendal")] + async fn test_parquet_writer_exec_with_memory_input() -> Result<()> { + use datafusion::datasource::memory::MemorySourceConfig; + use datafusion::datasource::source::DataSourceExec; + use datafusion::prelude::SessionContext; + + // Create 5 batches for the DataSourceExec input + let mut batches = Vec::new(); + for i in 1..=5 { + batches.push(create_test_record_batch(i)?); + } + + // Get schema from the first batch + let schema = batches[0].schema(); + + // Create DataSourceExec with MemorySourceConfig containing the 5 batches as a single partition + let partitions = vec![batches]; + let memory_source_config = MemorySourceConfig::try_new(&partitions, schema, None)?; + let memory_exec = Arc::new(DataSourceExec::new(Arc::new(memory_source_config))); + + // Create ParquetWriterExec with DataSourceExec as input + let output_path = "unused".to_string(); + let work_dir = "hdfs://namenode:9000/user/test_parquet_writer_exec".to_string(); + let column_names = vec!["id".to_string(), "name".to_string()]; + + let parquet_writer = ParquetWriterExec::try_new( + memory_exec, + output_path, + work_dir, + None, // job_id + Some(123), // task_attempt_id + CompressionCodec::None, + 0, // partition_id + column_names, + )?; + + // Create a session context and execute the plan + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + // Execute partition 0 + let mut stream = parquet_writer.execute(0, task_ctx)?; + + // Consume the stream (this triggers the write) + while let Some(batch_result) = stream.try_next().await? { + // The stream should be empty as ParquetWriterExec returns empty batches + assert_eq!(batch_result.num_rows(), 0); + } + + println!( + "Successfully completed ParquetWriterExec test with DataSourceExec input (5 batches, 5000 total rows)" + ); + + Ok(()) + } } diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index cb062d18e7..293b862ff5 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -29,7 +29,6 @@ use arrow::{ datatypes::{DataType, TimeUnit}, util::display::FormatOptions, }; -use bytes::BytesMut; use datafusion::common::{Result as DataFusionResult, ScalarValue}; use datafusion::error::DataFusionError; use datafusion::execution::object_store::ObjectStoreUrl; @@ -39,7 +38,6 @@ use datafusion_comet_spark_expr::EvalMode; use object_store::path::Path; use object_store::{parse_url, ObjectStore}; use std::collections::HashMap; -use std::io::Cursor; use std::time::Duration; use std::{fmt::Debug, hash::Hash, sync::Arc}; use url::Url; @@ -408,89 +406,6 @@ fn create_hdfs_object_store( Ok((Box::new(store), path)) } -use tokio::sync::mpsc; - -struct ChannelWriter { - sender: mpsc::Sender>, -} - -impl std::io::Write for ChannelWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.sender - .blocking_send(buf.to_vec()) - .map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "channel closed"))?; - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -//#[cfg(feature = "hdfs-opendal")] -use arrow::record_batch::RecordBatch; -use opendal::Writer; -use parquet::arrow::ArrowWriter; -use parquet::file::properties::WriterProperties; - -//#[cfg(feature = "hdfs-opendal")] -pub async fn write_record_batch_to_hdfs( - hdfs_writer: &mut Writer, - batch: RecordBatch, -) -> Result<(), opendal::Error> { - // ------------------------------------------------------------ - // 2. Channel between blocking and async worlds - // ------------------------------------------------------------ - //let (tx, mut rx) = mpsc::channel::>(8); - - //let schema = batch.schema(); - - let mut buffer = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); - writer.write(&batch).map_err(|e| { - opendal::Error::new( - opendal::ErrorKind::Unexpected, - &format!("write failed: {}", e), - ) - })?; - - writer.close().map_err(|e| { - opendal::Error::new( - opendal::ErrorKind::Unexpected, - &format!("Error closing writer: {}", e), - ) - })?; - hdfs_writer.write(buffer).await?; - - // let parquet_task = tokio::task::spawn_blocking(move || -> Result<(), Box> { - // let props = WriterProperties::builder().build(); - // let channel_writer = ChannelWriter { sender: tx }; - - // let mut writer = ArrowWriter::try_new( - // channel_writer, - // schema, - // Some(props), - // )?; - - // writer.write(&batch)?; - - // writer.close()?; // important to flush remaining data - - // Ok(()) - // }); - - // while let Some(chunk) = rx.recv().await { - // hdfs_writer.write(chunk).await?; - // } - - // parquet_task - // .await - // .map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, &format!("task join failed: {}", e)))? - // .map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, &format!("parquet write failed: {}", e)))?; - - Ok(()) -} - /// Stub implementation when hdfs-opendal feature is not enabled #[cfg(not(feature = "hdfs-opendal"))] pub async fn write_to_hdfs_with_opendal_async( From 27c4d54b2468ba5a6a9c16f3df751c15d4f42239 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 18 Dec 2025 21:18:15 -0800 Subject: [PATCH 05/11] [WIP] opendal writes --- native/Cargo.lock | 68 ++++++++----------- .../src/execution/operators/parquet_writer.rs | 2 +- native/core/src/lib.rs | 22 ++++-- native/core/src/parquet/parquet_support.rs | 2 +- native/fs-hdfs/Cargo.toml | 2 +- native/hdfs/Cargo.toml | 4 +- native/hdfs/src/object_store/hdfs.rs | 10 +-- 7 files changed, 55 insertions(+), 55 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index bf9a7ea2da..be936cb4e4 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -598,9 +598,9 @@ dependencies = [ [[package]] name = "aws-lc-rs" -version = "1.15.1" +version = "1.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b5ce75405893cd713f9ab8e297d8e438f624dde7d706108285f7e17a25a180f" +checksum = "6a88aab2464f1f25453baa7a07c84c5b7684e274054ba06817f382357f77a288" dependencies = [ "aws-lc-sys", "zeroize", @@ -608,9 +608,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.34.0" +version = "0.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "179c3777a8b5e70e90ea426114ffc565b2c1a9f82f6c4a0c5a34aa6ef5e781b6" +checksum = "b45afffdee1e7c9126814751f88dddc747f41d91da16c9551a0f1e8a11e788a1" dependencies = [ "cc", "cmake", @@ -789,9 +789,9 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.61.8" +version = "0.61.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6864c190cbb8e30cf4b77b2c8f3b6dfffa697a09b7218d2f7cd3d4c4065a9f7" +checksum = "49fa1213db31ac95288d981476f78d05d9cbb0353d22cdf3472cc05bb02f6551" dependencies = [ "aws-smithy-types", ] @@ -817,9 +817,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.9.5" +version = "1.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a392db6c583ea4a912538afb86b7be7c5d8887d91604f50eb55c262ee1b4a5f5" +checksum = "65fda37911905ea4d3141a01364bc5509a0f32ae3f3b22d6e330c0abfb62d247" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1145,9 +1145,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.19.0" +version = "3.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" [[package]] name = "bytecheck" @@ -1361,9 +1361,9 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cmake" -version = "0.1.54" +version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" +checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" dependencies = [ "cc", ] @@ -1852,7 +1852,7 @@ dependencies = [ "async-trait", "bytes", "chrono", - "fs-hdfs", + "datafusion-comet-fs-hdfs3", "fs-hdfs3", "futures", "object_store", @@ -2728,20 +2728,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fs-hdfs" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25f164ff6334da016dffd1c29a3c05b81c35b857ef829d3fa9e58ae8d3e6f76b" -dependencies = [ - "bindgen 0.64.0", - "cc", - "lazy_static", - "libc", - "log", - "url", -] - [[package]] name = "fs-hdfs3" version = "0.1.12" @@ -5005,9 +4991,9 @@ dependencies = [ [[package]] name = "roaring" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f08d6a905edb32d74a5d5737a0c9d7e950c312f3c46cb0ca0a2ca09ea11878a0" +checksum = "8ba9ce64a8f45d7fc86358410bb1a82e8c987504c0d4900e9141d69a9f26c885" dependencies = [ "bytemuck", "byteorder", @@ -5159,9 +5145,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c" +checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282" dependencies = [ "web-time", "zeroize", @@ -5878,18 +5864,18 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.7.3" +version = "0.7.5+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" dependencies = [ "serde_core", ] [[package]] name = "toml_edit" -version = "0.23.9" +version = "0.23.10+spec-1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d7cbc3b4b49633d57a0509303158ca50de80ae32c265093b24c414705807832" +checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269" dependencies = [ "indexmap 2.12.1", "toml_datetime", @@ -5899,9 +5885,9 @@ dependencies = [ [[package]] name = "toml_parser" -version = "1.0.4" +version = "1.0.6+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" dependencies = [ "winnow", ] @@ -5953,9 +5939,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -5975,9 +5961,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.35" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", ] diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 04879bf980..85d9cd9de0 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -340,7 +340,7 @@ impl ParquetWriterExec { .unwrap_or(output_file_path); // Create output directory - std::fs::create_dir_all(&local_path).map_err(|e| { + std::fs::create_dir_all(local_path).map_err(|e| { DataFusionError::Execution(format!( "Failed to create output directory '{}': {}", local_path, e diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index 10ecefad5b..2b883bd7df 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -39,10 +39,17 @@ use log4rs::{ }; use once_cell::sync::OnceCell; -#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] +#[cfg(all( + not(target_env = "msvc"), + feature = "jemalloc", + not(feature = "mimalloc") +))] use tikv_jemallocator::Jemalloc; -#[cfg(feature = "mimalloc")] +#[cfg(all( + feature = "mimalloc", + not(all(not(target_env = "msvc"), feature = "jemalloc")) +))] use mimalloc::MiMalloc; use errors::{try_unwrap_or_throw, CometError, CometResult}; @@ -55,11 +62,18 @@ pub mod execution; mod jvm_bridge; pub mod parquet; -#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] +#[cfg(all( + not(target_env = "msvc"), + feature = "jemalloc", + not(feature = "mimalloc") +))] #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; -#[cfg(feature = "mimalloc")] +#[cfg(all( + feature = "mimalloc", + not(all(not(target_env = "msvc"), feature = "jemalloc")) +))] #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 293b862ff5..2f67e0b593 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -370,7 +370,7 @@ fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap) -> } // Creates an HDFS object store from a URL using the native HDFS implementation -#[cfg(feature = "hdfs")] +#[cfg(all(feature = "hdfs", not(feature = "hdfs-opendal")))] fn create_hdfs_object_store( url: &Url, ) -> Result<(Box, Path), object_store::Error> { diff --git a/native/fs-hdfs/Cargo.toml b/native/fs-hdfs/Cargo.toml index 98005c860f..a47b69a1f2 100644 --- a/native/fs-hdfs/Cargo.toml +++ b/native/fs-hdfs/Cargo.toml @@ -32,7 +32,7 @@ publish = false readme = "README.md" [lib] -name = "hdfs" +name = "fs_hdfs" path = "src/lib.rs" [features] diff --git a/native/hdfs/Cargo.toml b/native/hdfs/Cargo.toml index dc8f970ef7..c66d71c203 100644 --- a/native/hdfs/Cargo.toml +++ b/native/hdfs/Cargo.toml @@ -33,7 +33,7 @@ edition = { workspace = true } [features] default = ["hdfs", "try_spawn_blocking"] -hdfs = ["fs-hdfs"] +hdfs = ["fs_hdfs"] hdfs3 = ["fs-hdfs3"] # Used for trying to spawn a blocking thread for implementing each object store interface when running in a tokio runtime try_spawn_blocking = [] @@ -42,7 +42,7 @@ try_spawn_blocking = [] async-trait = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } -fs-hdfs = { version = "^0.1.12", optional = true } +fs_hdfs = { package = "datafusion-comet-fs-hdfs3", path = "../fs-hdfs", optional = true } fs-hdfs3 = { version = "^0.1.12", optional = true } futures = { workspace = true } object_store = { workspace = true } diff --git a/native/hdfs/src/object_store/hdfs.rs b/native/hdfs/src/object_store/hdfs.rs index b49e879429..a93774cffe 100644 --- a/native/hdfs/src/object_store/hdfs.rs +++ b/native/hdfs/src/object_store/hdfs.rs @@ -26,9 +26,9 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; +use fs_hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs}; +use fs_hdfs::walkdir::HdfsWalkDir; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs}; -use hdfs::walkdir::HdfsWalkDir; use object_store::{ path::{self, Path}, Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload, @@ -422,7 +422,7 @@ impl ObjectStore for HadoopFileSystem { hdfs.delete(&to, false).map_err(to_error)?; } - hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to) + fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to) .map_err(to_error)?; Ok(()) @@ -437,7 +437,7 @@ impl ObjectStore for HadoopFileSystem { let to = HadoopFileSystem::path_to_filesystem(to); maybe_spawn_blocking(move || { - hdfs.rename(&from, &to).map_err(to_error)?; + hdfs.rename(&from, &to, true).map_err(to_error)?; Ok(()) }) @@ -459,7 +459,7 @@ impl ObjectStore for HadoopFileSystem { }); } - hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to) + fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to) .map_err(to_error)?; Ok(()) From 0529b6636a4dadd89d8bcf5cf953b4e48b803c66 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 23 Dec 2025 20:03:44 -0800 Subject: [PATCH 06/11] WIP: Add support for remote Parquet writer with openDAL --- native/core/src/execution/operators/parquet_writer.rs | 5 +++++ native/core/src/parquet/parquet_support.rs | 5 +++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 85d9cd9de0..c988db11c4 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -553,6 +553,7 @@ mod tests { use std::sync::Arc; /// Helper function to create a test RecordBatch with 1000 rows of (int, string) data + /// Example batch_id 1 -> 0..1000, 2 -> 1001..2000 fn create_test_record_batch(batch_id: i32) -> Result { assert!(batch_id > 0, "batch_id must be greater than 0"); let num_rows = batch_id * 1000; @@ -577,6 +578,7 @@ mod tests { #[tokio::test] #[cfg(feature = "hdfs-opendal")] + #[ignore = "This test requires a running HDFS cluster"] async fn test_write_to_hdfs_sync() -> Result<()> { use opendal::services::Hdfs; use opendal::Operator; @@ -627,6 +629,7 @@ mod tests { #[tokio::test] #[cfg(feature = "hdfs-opendal")] + #[ignore = "This test requires a running HDFS cluster"] async fn test_write_to_hdfs_streaming() -> Result<()> { use opendal::services::Hdfs; use opendal::Operator; @@ -707,6 +710,7 @@ mod tests { #[tokio::test] #[cfg(feature = "hdfs-opendal")] + #[ignore = "This test requires a running HDFS cluster"] async fn test_parquet_writer_streaming() -> Result<()> { // Configure output path let output_path = "/user/test_parquet_writer_streaming/data.parquet"; @@ -755,6 +759,7 @@ mod tests { #[tokio::test] #[cfg(feature = "hdfs-opendal")] + #[ignore = "This test requires a running HDFS cluster"] async fn test_parquet_writer_exec_with_memory_input() -> Result<()> { use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 2f67e0b593..2a6ec3550c 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -496,9 +496,9 @@ mod tests { use object_store::path::Path; use std::collections::HashMap; use std::sync::Arc; - use url::Url; /// Parses the url, registers the object store, and returns a tuple of the object store url and object store path + #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] pub(crate) fn prepare_object_store( runtime_env: Arc, url: String, @@ -506,10 +506,11 @@ mod tests { prepare_object_store_with_configs(runtime_env, url, &HashMap::new()) } - #[cfg(not(feature = "hdfs"))] + #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] #[test] fn test_prepare_object_store() { use crate::execution::operators::ExecutionError; + use url::Url; let local_file_system_url = "file:///comet/spark-warehouse/part-00000.snappy.parquet"; let hdfs_url = "hdfs://localhost:8020/comet/spark-warehouse/part-00000.snappy.parquet"; From da9ebd32c094cfabf107dcb858a6fd365b219e38 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 23 Dec 2025 20:28:15 -0800 Subject: [PATCH 07/11] [WIP] opendal writes --- native/core/src/parquet/parquet_support.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 2a6ec3550c..da8f10d67e 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -489,13 +489,16 @@ pub(crate) fn prepare_object_store_with_configs( #[cfg(test)] mod tests { - use crate::execution::operators::ExecutionError; - use crate::parquet::parquet_support::prepare_object_store_with_configs; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::execution::runtime_env::RuntimeEnv; use object_store::path::Path; - use std::collections::HashMap; use std::sync::Arc; + use url::Url; + + #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] + use crate::execution::operators::ExecutionError; + #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] + use std::collections::HashMap; /// Parses the url, registers the object store, and returns a tuple of the object store url and object store path #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] @@ -503,6 +506,18 @@ mod tests { runtime_env: Arc, url: String, ) -> Result<(ObjectStoreUrl, Path), ExecutionError> { + use crate::parquet::parquet_support::prepare_object_store_with_configs; + prepare_object_store_with_configs(runtime_env, url, &HashMap::new()) + } + + /// Parses the url, registers the object store, and returns a tuple of the object store url and object store path + #[cfg(feature = "hdfs")] + pub(crate) fn prepare_object_store( + runtime_env: Arc, + url: String, + ) -> Result<(ObjectStoreUrl, Path), crate::execution::operators::ExecutionError> { + use crate::parquet::parquet_support::prepare_object_store_with_configs; + use std::collections::HashMap; prepare_object_store_with_configs(runtime_env, url, &HashMap::new()) } @@ -510,7 +525,6 @@ mod tests { #[test] fn test_prepare_object_store() { use crate::execution::operators::ExecutionError; - use url::Url; let local_file_system_url = "file:///comet/spark-warehouse/part-00000.snappy.parquet"; let hdfs_url = "hdfs://localhost:8020/comet/spark-warehouse/part-00000.snappy.parquet"; From 959f192b63d150be7f38a5ec43a704d3cd33e3dc Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 23 Dec 2025 20:35:35 -0800 Subject: [PATCH 08/11] [WIP] opendal writes --- native/core/src/parquet/parquet_support.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index da8f10d67e..68dc8ef328 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -489,10 +489,30 @@ pub(crate) fn prepare_object_store_with_configs( #[cfg(test)] mod tests { + #[cfg(any( + all(not(feature = "hdfs"), not(feature = "hdfs-opendal")), + feature = "hdfs" + ))] use datafusion::execution::object_store::ObjectStoreUrl; + #[cfg(any( + all(not(feature = "hdfs"), not(feature = "hdfs-opendal")), + feature = "hdfs" + ))] use datafusion::execution::runtime_env::RuntimeEnv; + #[cfg(any( + all(not(feature = "hdfs"), not(feature = "hdfs-opendal")), + feature = "hdfs" + ))] use object_store::path::Path; + #[cfg(any( + all(not(feature = "hdfs"), not(feature = "hdfs-opendal")), + feature = "hdfs" + ))] use std::sync::Arc; + #[cfg(any( + all(not(feature = "hdfs"), not(feature = "hdfs-opendal")), + feature = "hdfs" + ))] use url::Url; #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] From 00d4457d7467cfacc9fbb602fe0f0cc46230abd8 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 24 Dec 2025 10:29:31 -0800 Subject: [PATCH 09/11] [WIP] opendal writes --- .../src/execution/operators/parquet_writer.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index c988db11c4..2ca1e9cfd5 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -332,18 +332,27 @@ impl ParquetWriterExec { )) } "local" => { - // For local file system, write directly to file + // For a local file system, write directly to file // Strip file:// or file: prefix if present let local_path = output_file_path .strip_prefix("file://") .or_else(|| output_file_path.strip_prefix("file:")) .unwrap_or(output_file_path); - // Create output directory - std::fs::create_dir_all(local_path).map_err(|e| { + // Extract the parent directory from the file path + let output_dir = std::path::Path::new(local_path).parent().ok_or_else(|| { + DataFusionError::Execution(format!( + "Failed to extract parent directory from path '{}'", + local_path + )) + })?; + + // Create the parent directory if it doesn't exist + std::fs::create_dir_all(output_dir).map_err(|e| { DataFusionError::Execution(format!( "Failed to create output directory '{}': {}", - local_path, e + output_dir.display(), + e )) })?; From 7e800b4eb661a3080291375447d3b1a37b530bd6 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 24 Dec 2025 12:39:42 -0800 Subject: [PATCH 10/11] [WIP] opendal writes --- .github/actions/rust-test/action.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/actions/rust-test/action.yaml b/.github/actions/rust-test/action.yaml index 4c9b13a174..10fc1375f0 100644 --- a/.github/actions/rust-test/action.yaml +++ b/.github/actions/rust-test/action.yaml @@ -70,5 +70,7 @@ runs: shell: bash run: | cd native + # Set LD_LIBRARY_PATH to include JVM library path for tests that use JNI + export LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH} RUST_BACKTRACE=1 cargo nextest run From 1a949c3303c7494ca613a369f1a6a1e3873d02ae Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 24 Dec 2025 13:17:37 -0800 Subject: [PATCH 11/11] Add support for remote Parquet writer with openDAL --- native/core/src/parquet/parquet_support.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 68dc8ef328..c9a27d7dcb 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -406,18 +406,6 @@ fn create_hdfs_object_store( Ok((Box::new(store), path)) } -/// Stub implementation when hdfs-opendal feature is not enabled -#[cfg(not(feature = "hdfs-opendal"))] -pub async fn write_to_hdfs_with_opendal_async( - _url: &Url, - _data: bytes::Bytes, -) -> Result<(), object_store::Error> { - Err(object_store::Error::Generic { - store: "hdfs-opendal", - source: "HDFS OpenDAL support is not enabled in this build".into(), - }) -} - #[cfg(feature = "hdfs-opendal")] fn get_name_node_uri(url: &Url) -> Result { use std::fmt::Write;