Skip to content

Commit 2ac032b

Browse files
authored
fix: emit empty RecordBatch for empty file writes (#19370)
## Which issue does this PR close? - Closes #16240. ## Rationale for this change If the input stream yields no RecordBatch at all, nothing gets sent downstream, and the writer never has a chance to produce a valid file. I added a small fallback: when single_file_output is enabled and no batches were received, we send a single empty RecordBatch with the input schema. ## Are these changes tested? Yes. ## Are there any user-facing changes? 1. I’m not fully convinced this logic belongs in the demuxer. Conceptually, it might be cleaner to handle this one layer downstream on the consumer side. However, that layer doesn’t seem to have access to the schema now, so moving the logic there would require a larger refactor. Currently, I choose the minimal change that fixes the issue while keeping the impact small. 2. Arrow seems like a special case, and there wasn’t much test coverage around. I have written some test cases for it.
1 parent 8809dae commit 2ac032b

File tree

6 files changed

+238
-0
lines changed

6 files changed

+238
-0
lines changed

datafusion/core/src/dataframe/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ impl DataFrameWriteOptions {
107107
}
108108

109109
/// Set the single_file_output value to true or false
110+
///
111+
/// When set to true, an output file will always be created even if the DataFrame is empty
110112
pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
111113
self.single_file_output = single_file_output;
112114
self

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,96 @@
1717

1818
//! Re-exports the [`datafusion_datasource_arrow::file_format`] module, and contains tests for it.
1919
pub use datafusion_datasource_arrow::file_format::*;
20+
21+
#[cfg(test)]
22+
mod tests {
23+
use futures::StreamExt;
24+
use std::sync::Arc;
25+
26+
use arrow::array::{Int64Array, StringArray};
27+
use arrow::datatypes::{DataType, Field, Schema};
28+
use arrow::record_batch::RecordBatch;
29+
use datafusion_common::Result;
30+
31+
use crate::execution::options::ArrowReadOptions;
32+
use crate::prelude::SessionContext;
33+
34+
#[tokio::test]
35+
async fn test_write_empty_arrow_from_sql() -> Result<()> {
36+
let ctx = SessionContext::new();
37+
38+
let tmp_dir = tempfile::TempDir::new()?;
39+
let path = format!("{}/empty_sql.arrow", tmp_dir.path().to_string_lossy());
40+
41+
ctx.sql(&format!(
42+
"COPY (SELECT CAST(1 AS BIGINT) AS id LIMIT 0) TO '{path}' STORED AS ARROW",
43+
))
44+
.await?
45+
.collect()
46+
.await?;
47+
48+
assert!(std::path::Path::new(&path).exists());
49+
50+
let read_df = ctx.read_arrow(&path, ArrowReadOptions::default()).await?;
51+
let stream = read_df.execute_stream().await?;
52+
53+
assert_eq!(stream.schema().fields().len(), 1);
54+
assert_eq!(stream.schema().field(0).name(), "id");
55+
56+
let results: Vec<_> = stream.collect().await;
57+
let total_rows: usize = results
58+
.iter()
59+
.filter_map(|r| r.as_ref().ok())
60+
.map(|b| b.num_rows())
61+
.sum();
62+
assert_eq!(total_rows, 0);
63+
64+
Ok(())
65+
}
66+
67+
#[tokio::test]
68+
async fn test_write_empty_arrow_from_record_batch() -> Result<()> {
69+
let ctx = SessionContext::new();
70+
71+
let schema = Arc::new(Schema::new(vec![
72+
Field::new("id", DataType::Int64, false),
73+
Field::new("name", DataType::Utf8, true),
74+
]));
75+
let empty_batch = RecordBatch::try_new(
76+
schema.clone(),
77+
vec![
78+
Arc::new(Int64Array::from(Vec::<i64>::new())),
79+
Arc::new(StringArray::from(Vec::<Option<&str>>::new())),
80+
],
81+
)?;
82+
83+
let tmp_dir = tempfile::TempDir::new()?;
84+
let path = format!("{}/empty_batch.arrow", tmp_dir.path().to_string_lossy());
85+
86+
ctx.register_batch("empty_table", empty_batch)?;
87+
88+
ctx.sql(&format!("COPY empty_table TO '{path}' STORED AS ARROW"))
89+
.await?
90+
.collect()
91+
.await?;
92+
93+
assert!(std::path::Path::new(&path).exists());
94+
95+
let read_df = ctx.read_arrow(&path, ArrowReadOptions::default()).await?;
96+
let stream = read_df.execute_stream().await?;
97+
98+
assert_eq!(stream.schema().fields().len(), 2);
99+
assert_eq!(stream.schema().field(0).name(), "id");
100+
assert_eq!(stream.schema().field(1).name(), "name");
101+
102+
let results: Vec<_> = stream.collect().await;
103+
let total_rows: usize = results
104+
.iter()
105+
.filter_map(|r| r.as_ref().ok())
106+
.map(|b| b.num_rows())
107+
.sum();
108+
assert_eq!(total_rows, 0);
109+
110+
Ok(())
111+
}
112+
}

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,6 +1537,68 @@ mod tests {
15371537
Ok(())
15381538
}
15391539

1540+
#[tokio::test]
1541+
async fn test_write_empty_csv_from_sql() -> Result<()> {
1542+
let ctx = SessionContext::new();
1543+
let tmp_dir = tempfile::TempDir::new()?;
1544+
let path = format!("{}/empty_sql.csv", tmp_dir.path().to_string_lossy());
1545+
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
1546+
df.write_csv(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
1547+
.await?;
1548+
assert!(std::path::Path::new(&path).exists());
1549+
1550+
let read_df = ctx
1551+
.read_csv(&path, CsvReadOptions::default().has_header(true))
1552+
.await?;
1553+
let stream = read_df.execute_stream().await?;
1554+
assert_eq!(stream.schema().fields().len(), 1);
1555+
assert_eq!(stream.schema().field(0).name(), "id");
1556+
1557+
let results: Vec<_> = stream.collect().await;
1558+
assert_eq!(results.len(), 0);
1559+
1560+
Ok(())
1561+
}
1562+
1563+
#[tokio::test]
1564+
async fn test_write_empty_csv_from_record_batch() -> Result<()> {
1565+
let ctx = SessionContext::new();
1566+
let schema = Arc::new(Schema::new(vec![
1567+
Field::new("id", DataType::Int64, false),
1568+
Field::new("name", DataType::Utf8, true),
1569+
]));
1570+
let empty_batch = RecordBatch::try_new(
1571+
schema.clone(),
1572+
vec![
1573+
Arc::new(arrow::array::Int64Array::from(Vec::<i64>::new())),
1574+
Arc::new(StringArray::from(Vec::<Option<&str>>::new())),
1575+
],
1576+
)?;
1577+
1578+
let tmp_dir = tempfile::TempDir::new()?;
1579+
let path = format!("{}/empty_batch.csv", tmp_dir.path().to_string_lossy());
1580+
1581+
// Write empty RecordBatch
1582+
let df = ctx.read_batch(empty_batch.clone())?;
1583+
df.write_csv(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
1584+
.await?;
1585+
// Expected the file to exist
1586+
assert!(std::path::Path::new(&path).exists());
1587+
1588+
let read_df = ctx
1589+
.read_csv(&path, CsvReadOptions::default().has_header(true))
1590+
.await?;
1591+
let stream = read_df.execute_stream().await?;
1592+
assert_eq!(stream.schema().fields().len(), 2);
1593+
assert_eq!(stream.schema().field(0).name(), "id");
1594+
assert_eq!(stream.schema().field(1).name(), "name");
1595+
1596+
let results: Vec<_> = stream.collect().await;
1597+
assert_eq!(results.len(), 0);
1598+
1599+
Ok(())
1600+
}
1601+
15401602
#[tokio::test]
15411603
async fn test_infer_schema_with_zero_max_records() -> Result<()> {
15421604
let session_ctx = SessionContext::new();

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,4 +349,46 @@ mod tests {
349349
fn fmt_batches(batches: &[RecordBatch]) -> String {
350350
pretty::pretty_format_batches(batches).unwrap().to_string()
351351
}
352+
353+
#[tokio::test]
354+
async fn test_write_empty_json_from_sql() -> Result<()> {
355+
let ctx = SessionContext::new();
356+
let tmp_dir = tempfile::TempDir::new()?;
357+
let path = format!("{}/empty_sql.json", tmp_dir.path().to_string_lossy());
358+
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
359+
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
360+
.await?;
361+
// Expected the file to exist and be empty
362+
assert!(std::path::Path::new(&path).exists());
363+
let metadata = std::fs::metadata(&path)?;
364+
assert_eq!(metadata.len(), 0);
365+
Ok(())
366+
}
367+
368+
#[tokio::test]
369+
async fn test_write_empty_json_from_record_batch() -> Result<()> {
370+
let ctx = SessionContext::new();
371+
let schema = Arc::new(Schema::new(vec![
372+
Field::new("id", DataType::Int64, false),
373+
Field::new("name", DataType::Utf8, true),
374+
]));
375+
let empty_batch = RecordBatch::try_new(
376+
schema.clone(),
377+
vec![
378+
Arc::new(arrow::array::Int64Array::from(Vec::<i64>::new())),
379+
Arc::new(arrow::array::StringArray::from(Vec::<Option<&str>>::new())),
380+
],
381+
)?;
382+
383+
let tmp_dir = tempfile::TempDir::new()?;
384+
let path = format!("{}/empty_batch.json", tmp_dir.path().to_string_lossy());
385+
let df = ctx.read_batch(empty_batch.clone())?;
386+
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
387+
.await?;
388+
// Expected the file to exist and be empty
389+
assert!(std::path::Path::new(&path).exists());
390+
let metadata = std::fs::metadata(&path)?;
391+
assert_eq!(metadata.len(), 0);
392+
Ok(())
393+
}
352394
}

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,6 +1366,28 @@ mod tests {
13661366
Ok(())
13671367
}
13681368

1369+
#[tokio::test]
1370+
async fn test_write_empty_parquet_from_sql() -> Result<()> {
1371+
let ctx = SessionContext::new();
1372+
1373+
let tmp_dir = tempfile::TempDir::new()?;
1374+
let path = format!("{}/empty_sql.parquet", tmp_dir.path().to_string_lossy());
1375+
let df = ctx.sql("SELECT CAST(1 AS INT) AS id LIMIT 0").await?;
1376+
df.write_parquet(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
1377+
.await?;
1378+
// Expected the file to exist
1379+
assert!(std::path::Path::new(&path).exists());
1380+
let read_df = ctx.read_parquet(&path, ParquetReadOptions::new()).await?;
1381+
let stream = read_df.execute_stream().await?;
1382+
assert_eq!(stream.schema().fields().len(), 1);
1383+
assert_eq!(stream.schema().field(0).name(), "id");
1384+
1385+
let results: Vec<_> = stream.collect().await;
1386+
assert_eq!(results.len(), 0);
1387+
1388+
Ok(())
1389+
}
1390+
13691391
#[tokio::test]
13701392
async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> {
13711393
// expected kv metadata without schema

datafusion/datasource/src/write/demux.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,11 @@ async fn row_count_demuxer(
191191
part_idx += 1;
192192
}
193193

194+
let schema = input.schema();
195+
let mut is_batch_received = false;
196+
194197
while let Some(rb) = input.next().await.transpose()? {
198+
is_batch_received = true;
195199
// ensure we have at least minimum_parallel_files open
196200
if open_file_streams.len() < minimum_parallel_files {
197201
open_file_streams.push(create_new_file_stream(
@@ -228,6 +232,19 @@ async fn row_count_demuxer(
228232

229233
next_send_steam = (next_send_steam + 1) % minimum_parallel_files;
230234
}
235+
236+
// if there is no batch send but with a single file, send an empty batch
237+
if single_file_output && !is_batch_received {
238+
open_file_streams
239+
.first_mut()
240+
.ok_or_else(|| internal_datafusion_err!("Expected a single output file"))?
241+
.send(RecordBatch::new_empty(schema))
242+
.await
243+
.map_err(|_| {
244+
exec_datafusion_err!("Error sending empty RecordBatch to file stream!")
245+
})?;
246+
}
247+
231248
Ok(())
232249
}
233250

0 commit comments

Comments
 (0)