diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 1469aad5417b8..88322fe09abe8 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -106,10 +106,11 @@ cargo run --example dataframe -- dataframe #### Category: Single Process -| Subcommand | File Path | Description | -| --------------------- | ----------------------------------------------------------------------------------- | ------------------------------------------------------ | -| dataframe | [`dataframe/dataframe.rs`](examples/dataframe/dataframe.rs) | Query DataFrames from various sources and write output | -| deserialize_to_struct | [`dataframe/deserialize_to_struct.rs`](examples/dataframe/deserialize_to_struct.rs) | Convert Arrow arrays into Rust structs | +| Subcommand | File Path | Description | +| --------------------- | ----------------------------------------------------------------------------------- | ------------------------------------------------------- | +| cache_factory | [`dataframe/cache_factory.rs`](examples/dataframe/cache_factory.rs) | Custom lazy caching for DataFrames using `CacheFactory` | +| dataframe | [`dataframe/dataframe.rs`](examples/dataframe/dataframe.rs) | Query DataFrames from various sources and write output | +| deserialize_to_struct | [`dataframe/deserialize_to_struct.rs`](examples/dataframe/deserialize_to_struct.rs) | Convert Arrow arrays into Rust structs | ## Execution Monitoring Examples diff --git a/datafusion-examples/data/README.md b/datafusion-examples/data/README.md new file mode 100644 index 0000000000000..e8296a8856e60 --- /dev/null +++ b/datafusion-examples/data/README.md @@ -0,0 +1,25 @@ + + +## Example datasets + +| Filename | Path | Description | +| ----------- | --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| `cars.csv` | [`data/csv/cars.csv`](./csv/cars.csv) | Time-series–like dataset containing car identifiers, speed values, and timestamps. Used in window function and time-based query examples (e.g. ordering, window frames). | +| `regex.csv` | [`data/csv/regex.csv`](./csv/regex.csv) | Dataset for regular expression examples. Contains input values, regex patterns, replacement strings, and optional flags. Covers ASCII, Unicode, and locale-specific text processing. | diff --git a/datafusion-examples/data/csv/cars.csv b/datafusion-examples/data/csv/cars.csv new file mode 100644 index 0000000000000..bc40f3b01e7a5 --- /dev/null +++ b/datafusion-examples/data/csv/cars.csv @@ -0,0 +1,26 @@ +car,speed,time +red,20.0,1996-04-12T12:05:03.000000000 +red,20.3,1996-04-12T12:05:04.000000000 +red,21.4,1996-04-12T12:05:05.000000000 +red,21.5,1996-04-12T12:05:06.000000000 +red,19.0,1996-04-12T12:05:07.000000000 +red,18.0,1996-04-12T12:05:08.000000000 +red,17.0,1996-04-12T12:05:09.000000000 +red,7.0,1996-04-12T12:05:10.000000000 +red,7.1,1996-04-12T12:05:11.000000000 +red,7.2,1996-04-12T12:05:12.000000000 +red,3.0,1996-04-12T12:05:13.000000000 +red,1.0,1996-04-12T12:05:14.000000000 +red,0.0,1996-04-12T12:05:15.000000000 +green,10.0,1996-04-12T12:05:03.000000000 +green,10.3,1996-04-12T12:05:04.000000000 +green,10.4,1996-04-12T12:05:05.000000000 +green,10.5,1996-04-12T12:05:06.000000000 +green,11.0,1996-04-12T12:05:07.000000000 +green,12.0,1996-04-12T12:05:08.000000000 +green,14.0,1996-04-12T12:05:09.000000000 +green,15.0,1996-04-12T12:05:10.000000000 +green,15.1,1996-04-12T12:05:11.000000000 +green,15.2,1996-04-12T12:05:12.000000000 +green,8.0,1996-04-12T12:05:13.000000000 +green,2.0,1996-04-12T12:05:14.000000000 diff --git a/datafusion-examples/data/csv/regex.csv b/datafusion-examples/data/csv/regex.csv new file mode 100644 index 0000000000000..b249c39522b60 --- /dev/null +++ b/datafusion-examples/data/csv/regex.csv @@ -0,0 +1,12 @@ +values,patterns,replacement,flags +abc,^(a),bb\1bb,i +ABC,^(A).*,B,i +aBc,(b|d),e,i +AbC,(B|D),e, +aBC,^(b|c),d, +4000,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz, +4010,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz, +Düsseldorf,[\p{Letter}-]+,München, +Москва,[\p{L}-]+,Moscow, +Köln,[a-zA-Z]ö[a-zA-Z]{2},Koln, +اليوم,^\p{Arabic}+$,Today, \ No newline at end of file diff --git a/datafusion-examples/examples/builtin_functions/regexp.rs b/datafusion-examples/examples/builtin_functions/regexp.rs index e8376cd0c94eb..5a9fd0839b2be 100644 --- a/datafusion-examples/examples/builtin_functions/regexp.rs +++ b/datafusion-examples/examples/builtin_functions/regexp.rs @@ -1,5 +1,4 @@ // Licensed to the Apache Software Foundation (ASF) under one -// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file @@ -18,12 +17,11 @@ //! See `main.rs` for how to run it. -use std::{fs::File, io::Write}; +use std::path::PathBuf; use datafusion::common::{assert_batches_eq, assert_contains}; use datafusion::error::Result; use datafusion::prelude::*; -use tempfile::tempdir; /// This example demonstrates how to use the regexp_* functions /// @@ -35,29 +33,12 @@ use tempfile::tempdir; /// https://docs.rs/regex/latest/regex/#grouping-and-flags pub async fn regexp() -> Result<()> { let ctx = SessionContext::new(); - // content from file 'datafusion/physical-expr/tests/data/regex.csv' - let csv_data = r#"values,patterns,replacement,flags -abc,^(a),bb\1bb,i -ABC,^(A).*,B,i -aBc,(b|d),e,i -AbC,(B|D),e, -aBC,^(b|c),d, -4000,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz, -4010,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz, -Düsseldorf,[\p{Letter}-]+,München, -Москва,[\p{L}-]+,Moscow, -Köln,[a-zA-Z]ö[a-zA-Z]{2},Koln, -اليوم,^\p{Arabic}+$,Today,"#; - let dir = tempdir()?; - let file_path = dir.path().join("regex.csv"); - { - let mut file = File::create(&file_path)?; - // write CSV data - file.write_all(csv_data.as_bytes())?; - } // scope closes the file - let file_path = file_path.to_str().unwrap(); - - ctx.register_csv("examples", file_path, CsvReadOptions::new()) + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("regex.csv"); + + ctx.register_csv("examples", path.to_str().unwrap(), CsvReadOptions::new()) .await?; // diff --git a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs index 7b2e321362632..792f0de34d302 100644 --- a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs +++ b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs @@ -17,9 +17,10 @@ //! See `main.rs` for how to run it. +use std::path::PathBuf; use std::sync::Arc; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::common::config::CsvOptions; use datafusion::{ assert_batches_eq, @@ -31,7 +32,6 @@ use datafusion::{ }, error::Result, physical_plan::metrics::ExecutionPlanMetricsSet, - test_util::aggr_test_schema, }; use datafusion::datasource::physical_plan::FileScanConfigBuilder; @@ -50,12 +50,20 @@ pub async fn csv_json_opener() -> Result<()> { async fn csv_opener() -> Result<()> { let object_store = Arc::new(LocalFileSystem::new()); - let schema = aggr_test_schema(); - - let testdata = datafusion::test_util::arrow_test_data(); - let path = format!("{testdata}/csv/aggregate_test_100.csv"); + let schema = Arc::new(Schema::new(vec![ + Field::new("car", DataType::Utf8, false), + Field::new("speed", DataType::Float64, false), + Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + ])); - let path = std::path::Path::new(&path).canonicalize()?; + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); let options = CsvOptions { has_header: Some(true), @@ -71,7 +79,7 @@ async fn csv_opener() -> Result<()> { let scan_config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source) - .with_projection_indices(Some(vec![12, 0]))? + .with_projection_indices(Some(vec![0, 1]))? .with_limit(Some(5)) .with_file(PartitionedFile::new(path.display().to_string(), 10)) .build(); @@ -89,15 +97,15 @@ async fn csv_opener() -> Result<()> { } assert_batches_eq!( &[ - "+--------------------------------+----+", - "| c13 | c1 |", - "+--------------------------------+----+", - "| 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | c |", - "| C2GT5KVyOPZpgKVl110TyZO0NcJ434 | d |", - "| AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | b |", - "| 0keZ5G8BffGwgF2RwQD59TFzMStxCB | a |", - "| Ig1QcuKsjHXkproePdERo2w0mYzIqd | b |", - "+--------------------------------+----+", + "+-----+-------+", + "| car | speed |", + "+-----+-------+", + "| red | 20.0 |", + "| red | 20.3 |", + "| red | 21.4 |", + "| red | 21.5 |", + "| red | 19.0 |", + "+-----+-------+", ], &result ); diff --git a/datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs b/datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs index 554382ea9549e..ca4cac0d2c786 100644 --- a/datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs +++ b/datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs @@ -17,7 +17,8 @@ //! See `main.rs` for how to run it. -use datafusion::common::test_util::datafusion_test_data; +use std::path::PathBuf; + use datafusion::error::Result; use datafusion::prelude::*; @@ -27,33 +28,36 @@ pub async fn csv_sql_streaming() -> Result<()> { // create local execution context let ctx = SessionContext::new(); - let testdata = datafusion_test_data(); + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); - // Register a table source and tell DataFusion the file is ordered by `ts ASC`. + // Register a table source and tell DataFusion the file is ordered by `car ASC`. // Note it is the responsibility of the user to make sure // that file indeed satisfies this condition or else incorrect answers may be produced. let asc = true; let nulls_first = true; - let sort_expr = vec![col("ts").sort(asc, nulls_first)]; + let sort_expr = vec![col("car").sort(asc, nulls_first)]; // register csv file with the execution context ctx.register_csv( "ordered_table", - &format!("{testdata}/window_1.csv"), + path.to_str().unwrap(), CsvReadOptions::new().file_sort_order(vec![sort_expr]), ) .await?; // execute the query - // Following query can be executed with unbounded sources because group by expressions (e.g ts) is + // Following query can be executed with unbounded sources because group by expressions (e.g car) is // already ordered at the source. // // Unbounded sources means that if the input came from a "never ending" source (such as a FIFO // file on unix) the query could produce results incrementally as data was read. let df = ctx .sql( - "SELECT ts, MIN(inc_col), MAX(inc_col) \ + "SELECT car, MIN(speed), MAX(speed) \ FROM ordered_table \ - GROUP BY ts", + GROUP BY car", ) .await?; @@ -64,7 +68,7 @@ pub async fn csv_sql_streaming() -> Result<()> { // its result in streaming fashion, because its required ordering is already satisfied at the source. let df = ctx .sql( - "SELECT ts, SUM(inc_col) OVER(ORDER BY ts ASC) \ + "SELECT car, SUM(speed) OVER(ORDER BY car ASC) \ FROM ordered_table", ) .await?; diff --git a/datafusion-examples/examples/data_io/parquet_encrypted.rs b/datafusion-examples/examples/data_io/parquet_encrypted.rs index f88ab91321e91..1cdd57a8bef67 100644 --- a/datafusion-examples/examples/data_io/parquet_encrypted.rs +++ b/datafusion-examples/examples/data_io/parquet_encrypted.rs @@ -17,28 +17,49 @@ //! See `main.rs` for how to run it. +use std::path::PathBuf; +use std::sync::Arc; + use datafusion::common::DataFusionError; use datafusion::config::{ConfigFileEncryptionProperties, TableParquetOptions}; use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; use datafusion::logical_expr::{col, lit}; use datafusion::parquet::encryption::decrypt::FileDecryptionProperties; use datafusion::parquet::encryption::encrypt::FileEncryptionProperties; +use datafusion::prelude::CsvReadOptions; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use std::sync::Arc; use tempfile::TempDir; +use tokio::fs::create_dir_all; /// Read and write encrypted Parquet files using DataFusion pub async fn parquet_encrypted() -> datafusion::common::Result<()> { // The SessionContext is the main high level API for interacting with DataFusion let ctx = SessionContext::new(); - // Find the local path of "alltypes_plain.parquet" - let testdata = datafusion::test_util::parquet_test_data(); - let filename = &format!("{testdata}/alltypes_plain.parquet"); + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await?; // Read the sample parquet file let parquet_df = ctx - .read_parquet(filename, ParquetReadOptions::default()) + .read_parquet(out_dir.to_str().unwrap(), ParquetReadOptions::default()) .await?; // Show information from the dataframe @@ -52,27 +73,27 @@ pub async fn parquet_encrypted() -> datafusion::common::Result<()> { let (encrypt, decrypt) = setup_encryption(&parquet_df)?; // Create a temporary file location for the encrypted parquet file - let tmp_dir = TempDir::new()?; - let tempfile = tmp_dir.path().join("alltypes_plain-encrypted.parquet"); - let tempfile_str = tempfile.into_os_string().into_string().unwrap(); + let tempfile = tmp_source.path().join("cars_encrypted"); // Write encrypted parquet let mut options = TableParquetOptions::default(); options.crypto.file_encryption = Some(ConfigFileEncryptionProperties::from(&encrypt)); parquet_df .write_parquet( - tempfile_str.as_str(), + tempfile.to_str().unwrap(), DataFrameWriteOptions::new().with_single_file_output(true), Some(options), ) .await?; - // Read encrypted parquet + // Read encrypted parquet back as a DataFrame using matching decryption config let ctx: SessionContext = SessionContext::new(); let read_options = ParquetReadOptions::default().file_decryption_properties((&decrypt).into()); - let encrypted_parquet_df = ctx.read_parquet(tempfile_str, read_options).await?; + let encrypted_parquet_df = ctx + .read_parquet(tempfile.to_str().unwrap(), read_options) + .await?; // Show information from the dataframe println!( @@ -91,11 +112,12 @@ async fn query_dataframe(df: &DataFrame) -> Result<(), DataFusionError> { df.clone().describe().await?.show().await?; // Select three columns and filter the results - // so that only rows where id > 1 are returned + // so that only rows where speed > 5 are returned + // select car, speed, time from t where speed > 5 println!("\nSelected rows and columns:"); df.clone() - .select_columns(&["id", "bool_col", "timestamp_col"])? - .filter(col("id").gt(lit(5)))? + .select_columns(&["car", "speed", "time"])? + .filter(col("speed").gt(lit(5)))? .show() .await?; diff --git a/datafusion-examples/examples/data_io/parquet_exec_visitor.rs b/datafusion-examples/examples/data_io/parquet_exec_visitor.rs index d38fe9e171205..d9577e5010a92 100644 --- a/datafusion-examples/examples/data_io/parquet_exec_visitor.rs +++ b/datafusion-examples/examples/data_io/parquet_exec_visitor.rs @@ -17,8 +17,10 @@ //! See `main.rs` for how to run it. +use std::path::PathBuf; use std::sync::Arc; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::datasource::physical_plan::{FileGroup, ParquetSource}; @@ -29,23 +31,47 @@ use datafusion::physical_plan::metrics::MetricValue; use datafusion::physical_plan::{ ExecutionPlan, ExecutionPlanVisitor, execute_stream, visit_execution_plan, }; +use datafusion::prelude::CsvReadOptions; use futures::StreamExt; +use tempfile::TempDir; +use tokio::fs::create_dir_all; /// Example of collecting metrics after execution by visiting the `ExecutionPlan` pub async fn parquet_exec_visitor() -> datafusion::common::Result<()> { let ctx = SessionContext::new(); - let test_data = datafusion::test_util::parquet_test_data(); + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await?; // Configure listing options let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions::new(Arc::new(file_format)); + let table_path = format!("file://{}", out_dir.to_str().unwrap()); + // First example were we use an absolute path, which requires no additional setup. let _ = ctx .register_listing_table( "my_table", - &format!("file://{test_data}/alltypes_plain.parquet"), + &table_path, listing_options.clone(), None, None, diff --git a/datafusion-examples/examples/dataframe/cache_factory.rs b/datafusion-examples/examples/dataframe/cache_factory.rs index a6c465720c626..c76e3796b9165 100644 --- a/datafusion-examples/examples/dataframe/cache_factory.rs +++ b/datafusion-examples/examples/dataframe/cache_factory.rs @@ -19,6 +19,7 @@ use std::fmt::Debug; use std::hash::Hash; +use std::path::PathBuf; use std::sync::Arc; use std::sync::RwLock; @@ -26,6 +27,7 @@ use arrow::array::RecordBatch; use async_trait::async_trait; use datafusion::catalog::memory::MemorySourceConfig; use datafusion::common::DFSchemaRef; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::error::Result; use datafusion::execution::SessionState; use datafusion::execution::SessionStateBuilder; @@ -44,6 +46,8 @@ use datafusion::prelude::ParquetReadOptions; use datafusion::prelude::SessionContext; use datafusion::prelude::*; use datafusion_common::HashMap; +use tempfile::TempDir; +use tokio::fs::create_dir_all; /// This example demonstrates how to leverage [CacheFactory] to implement custom caching strategies for dataframes in DataFusion. /// By default, [DataFrame::cache] in Datafusion is eager and creates an in-memory table. This example shows a basic alternative implementation for lazy caching. @@ -53,28 +57,46 @@ use datafusion_common::HashMap; /// - A [CacheNodeQueryPlanner] that installs [CacheNodePlanner]. /// - A simple in-memory [CacheManager] that stores cached [RecordBatch]es. Note that the implementation for this example is very naive and only implements put, but for real production use cases cache eviction and drop should also be implemented. pub async fn cache_dataframe_with_custom_logic() -> Result<()> { - let testdata = datafusion::test_util::parquet_test_data(); - let filename = &format!("{testdata}/alltypes_plain.parquet"); - let session_state = SessionStateBuilder::new() .with_cache_factory(Some(Arc::new(CustomCacheFactory {}))) .with_query_planner(Arc::new(CacheNodeQueryPlanner::default())) .build(); let ctx = SessionContext::new_with_state(session_state); + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await?; + // Read the parquet files and show its schema using 'describe' let parquet_df = ctx - .read_parquet(filename, ParquetReadOptions::default()) + .read_parquet(out_dir.to_str().unwrap(), ParquetReadOptions::default()) .await?; let df_cached = parquet_df - .select_columns(&["id", "bool_col", "timestamp_col"])? - .filter(col("id").gt(lit(1)))? + .select_columns(&["car", "speed", "time"])? + .filter(col("speed").gt(lit(1.0)))? .cache() .await?; - let df1 = df_cached.clone().filter(col("bool_col").is_true())?; - let df2 = df1.clone().sort(vec![col("id").sort(true, false)])?; + let df1 = df_cached.clone().filter(col("car").eq(lit("red")))?; + let df2 = df1.clone().sort(vec![col("car").sort(true, false)])?; // should see log for caching only once df_cached.show().await?; diff --git a/datafusion-examples/examples/dataframe/dataframe.rs b/datafusion-examples/examples/dataframe/dataframe.rs index 94653e80c8695..b6cc6540f5ce0 100644 --- a/datafusion-examples/examples/dataframe/dataframe.rs +++ b/datafusion-examples/examples/dataframe/dataframe.rs @@ -17,6 +17,11 @@ //! See `main.rs` for how to run it. +use std::fs::File; +use std::io::Write; +use std::path::PathBuf; +use std::sync::Arc; + use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, StringViewArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::catalog::MemTable; @@ -28,10 +33,8 @@ use datafusion::error::Result; use datafusion::functions_aggregate::average::avg; use datafusion::functions_aggregate::min_max::max; use datafusion::prelude::*; -use std::fs::{File, create_dir_all}; -use std::io::Write; -use std::sync::Arc; use tempfile::{TempDir, tempdir}; +use tokio::fs::create_dir_all; /// This example demonstrates using DataFusion's DataFrame API /// @@ -64,8 +67,8 @@ pub async fn dataframe_example() -> Result<()> { read_memory(&ctx).await?; read_memory_macro().await?; write_out(&ctx).await?; - register_aggregate_test_data("t1", &ctx).await?; - register_aggregate_test_data("t2", &ctx).await?; + register_cars_test_data("t1", &ctx).await?; + register_cars_test_data("t2", &ctx).await?; where_scalar_subquery(&ctx).await?; where_in_subquery(&ctx).await?; where_exist_subquery(&ctx).await?; @@ -77,23 +80,41 @@ pub async fn dataframe_example() -> Result<()> { /// 2. Show the schema /// 3. Select columns and rows async fn read_parquet(ctx: &SessionContext) -> Result<()> { - // Find the local path of "alltypes_plain.parquet" - let testdata = datafusion::test_util::parquet_test_data(); - let filename = &format!("{testdata}/alltypes_plain.parquet"); + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await?; // Read the parquet files and show its schema using 'describe' let parquet_df = ctx - .read_parquet(filename, ParquetReadOptions::default()) + .read_parquet(out_dir.to_str().unwrap(), ParquetReadOptions::default()) .await?; // show its schema using 'describe' parquet_df.clone().describe().await?.show().await?; // Select three columns and filter the results - // so that only rows where id > 1 are returned + // so that only rows where speed > 1 are returned + // select car, speed, time from t where speed > 1 parquet_df - .select_columns(&["id", "bool_col", "timestamp_col"])? - .filter(col("id").gt(lit(1)))? + .select_columns(&["car", "speed", "time"])? + .filter(col("speed").gt(lit(1)))? .show() .await?; @@ -211,15 +232,15 @@ async fn write_out(ctx: &SessionContext) -> Result<()> { // Create a single temp root with subdirectories let tmp_root = TempDir::new()?; let examples_root = tmp_root.path().join("datafusion-examples"); - create_dir_all(&examples_root)?; + create_dir_all(&examples_root).await?; let table_dir = examples_root.join("test_table"); let parquet_dir = examples_root.join("test_parquet"); let csv_dir = examples_root.join("test_csv"); let json_dir = examples_root.join("test_json"); - create_dir_all(&table_dir)?; - create_dir_all(&parquet_dir)?; - create_dir_all(&csv_dir)?; - create_dir_all(&json_dir)?; + create_dir_all(&table_dir).await?; + create_dir_all(&parquet_dir).await?; + create_dir_all(&csv_dir).await?; + create_dir_all(&json_dir).await?; let create_sql = format!( "CREATE EXTERNAL TABLE test(tablecol1 varchar) @@ -266,7 +287,7 @@ async fn write_out(ctx: &SessionContext) -> Result<()> { } /// Use the DataFrame API to execute the following subquery: -/// select c1,c2 from t1 where (select avg(t2.c2) from t2 where t1.c1 = t2.c1)>0 limit 3; +/// select car, speed from t1 where (select avg(t2.speed) from t2 where t1.car = t2.car) > 0 limit 3; async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> { ctx.table("t1") .await? @@ -274,14 +295,14 @@ async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> { scalar_subquery(Arc::new( ctx.table("t2") .await? - .filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))? - .aggregate(vec![], vec![avg(col("t2.c2"))])? - .select(vec![avg(col("t2.c2"))])? + .filter(out_ref_col(DataType::Utf8, "t1.car").eq(col("t2.car")))? + .aggregate(vec![], vec![avg(col("t2.speed"))])? + .select(vec![avg(col("t2.speed"))])? .into_unoptimized_plan(), )) - .gt(lit(0u8)), + .gt(lit(0.0)), )? - .select(vec![col("t1.c1"), col("t1.c2")])? + .select(vec![col("t1.car"), col("t1.speed")])? .limit(0, Some(3))? .show() .await?; @@ -289,22 +310,24 @@ async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> { } /// Use the DataFrame API to execute the following subquery: -/// select t1.c1, t1.c2 from t1 where t1.c2 in (select max(t2.c2) from t2 where t2.c1 > 0 ) limit 3; +/// select t1.car, t1.speed from t1 where t1.speed in (select max(t2.speed) from t2 where t2.car = 'red') limit 3; async fn where_in_subquery(ctx: &SessionContext) -> Result<()> { ctx.table("t1") .await? .filter(in_subquery( - col("t1.c2"), + col("t1.speed"), Arc::new( ctx.table("t2") .await? - .filter(col("t2.c1").gt(lit(ScalarValue::UInt8(Some(0)))))? - .aggregate(vec![], vec![max(col("t2.c2"))])? - .select(vec![max(col("t2.c2"))])? + .filter( + col("t2.car").eq(lit(ScalarValue::Utf8(Some("red".to_string())))), + )? + .aggregate(vec![], vec![max(col("t2.speed"))])? + .select(vec![max(col("t2.speed"))])? .into_unoptimized_plan(), ), ))? - .select(vec![col("t1.c1"), col("t1.c2")])? + .select(vec![col("t1.car"), col("t1.speed")])? .limit(0, Some(3))? .show() .await?; @@ -312,31 +335,31 @@ async fn where_in_subquery(ctx: &SessionContext) -> Result<()> { } /// Use the DataFrame API to execute the following subquery: -/// select t1.c1, t1.c2 from t1 where exists (select t2.c2 from t2 where t1.c1 = t2.c1) limit 3; +/// select t1.car, t1.speed from t1 where exists (select t2.speed from t2 where t1.car = t2.car) limit 3; async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> { ctx.table("t1") .await? .filter(exists(Arc::new( ctx.table("t2") .await? - .filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))? - .select(vec![col("t2.c2")])? + .filter(out_ref_col(DataType::Utf8, "t1.car").eq(col("t2.car")))? + .select(vec![col("t2.speed")])? .into_unoptimized_plan(), )))? - .select(vec![col("t1.c1"), col("t1.c2")])? + .select(vec![col("t1.car"), col("t1.speed")])? .limit(0, Some(3))? .show() .await?; Ok(()) } -async fn register_aggregate_test_data(name: &str, ctx: &SessionContext) -> Result<()> { - let testdata = datafusion::test_util::arrow_test_data(); - ctx.register_csv( - name, - &format!("{testdata}/csv/aggregate_test_100.csv"), - CsvReadOptions::default(), - ) - .await?; +async fn register_cars_test_data(name: &str, ctx: &SessionContext) -> Result<()> { + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + + ctx.register_csv(name, path.to_str().unwrap(), CsvReadOptions::default()) + .await?; Ok(()) } diff --git a/datafusion-examples/examples/dataframe/deserialize_to_struct.rs b/datafusion-examples/examples/dataframe/deserialize_to_struct.rs index e19d45554131a..2ece7063619c1 100644 --- a/datafusion-examples/examples/dataframe/deserialize_to_struct.rs +++ b/datafusion-examples/examples/dataframe/deserialize_to_struct.rs @@ -17,12 +17,16 @@ //! See `main.rs` for how to run it. -use arrow::array::{AsArray, PrimitiveArray}; -use arrow::datatypes::{Float64Type, Int32Type}; +use std::path::PathBuf; + +use arrow::array::{Array, Float64Array, StringViewArray}; use datafusion::common::assert_batches_eq; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::error::Result; use datafusion::prelude::*; use futures::StreamExt; +use tempfile::TempDir; +use tokio::fs::create_dir_all; /// This example shows how to convert query results into Rust structs by using /// the Arrow APIs to convert the results into Rust native types. @@ -34,63 +38,120 @@ use futures::StreamExt; pub async fn deserialize_to_struct() -> Result<()> { // Run a query that returns two columns of data let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); + + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await?; + ctx.register_parquet( - "alltypes_plain", - &format!("{testdata}/alltypes_plain.parquet"), + "cars", + out_dir.to_str().unwrap(), ParquetReadOptions::default(), ) .await?; + let df = ctx - .sql("SELECT int_col, double_col FROM alltypes_plain") + .sql("SELECT car, speed FROM cars ORDER BY speed LIMIT 50") .await?; - // print out the results showing we have an int32 and a float64 column + // print out the results showing we have car and speed columns and a deterministic ordering let results = df.clone().collect().await?; assert_batches_eq!( [ - "+---------+------------+", - "| int_col | double_col |", - "+---------+------------+", - "| 0 | 0.0 |", - "| 1 | 10.1 |", - "| 0 | 0.0 |", - "| 1 | 10.1 |", - "| 0 | 0.0 |", - "| 1 | 10.1 |", - "| 0 | 0.0 |", - "| 1 | 10.1 |", - "+---------+------------+", + "+-------+-------+", + "| car | speed |", + "+-------+-------+", + "| red | 0.0 |", + "| red | 1.0 |", + "| green | 2.0 |", + "| red | 3.0 |", + "| red | 7.0 |", + "| red | 7.1 |", + "| red | 7.2 |", + "| green | 8.0 |", + "| green | 10.0 |", + "| green | 10.3 |", + "| green | 10.4 |", + "| green | 10.5 |", + "| green | 11.0 |", + "| green | 12.0 |", + "| green | 14.0 |", + "| green | 15.0 |", + "| green | 15.1 |", + "| green | 15.2 |", + "| red | 17.0 |", + "| red | 18.0 |", + "| red | 19.0 |", + "| red | 20.0 |", + "| red | 20.3 |", + "| red | 21.4 |", + "| red | 21.5 |", + "+-------+-------+", ], &results ); // We will now convert the query results into a Rust struct let mut stream = df.execute_stream().await?; - let mut list = vec![]; + let mut list: Vec = vec![]; // DataFusion produces data in chunks called `RecordBatch`es which are // typically 8000 rows each. This loop processes each `RecordBatch` as it is // produced by the query plan and adds it to the list - while let Some(b) = stream.next().await.transpose()? { + while let Some(batch) = stream.next().await.transpose()? { // Each `RecordBatch` has one or more columns. Each column is stored as // an `ArrayRef`. To interact with data using Rust native types we need to // convert these `ArrayRef`s into concrete array types using APIs from // the arrow crate. // In this case, we know that each batch has two columns of the Arrow - // types Int32 and Float64, so first we cast the two columns to the + // types StringView and Float64, so first we cast the two columns to the // appropriate Arrow PrimitiveArray (this is a fast / zero-copy cast).: - let int_col: &PrimitiveArray = b.column(0).as_primitive(); - let float_col: &PrimitiveArray = b.column(1).as_primitive(); + let car_col = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("car column must be Utf8View"); + + let speed_col = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("speed column must be Float64"); // With PrimitiveArrays, we can access to the values as native Rust - // types i32 and f64, and forming the desired `Data` structs - for (i, f) in int_col.values().iter().zip(float_col.values()) { - list.push(Data { - int_col: *i, - double_col: *f, - }) + // types String and f64, and forming the desired `Data` structs + for i in 0..batch.num_rows() { + let car = if car_col.is_null(i) { + None + } else { + Some(car_col.value(i).to_string()) + }; + + let speed = if speed_col.is_null(i) { + None + } else { + Some(speed_col.value(i)) + }; + + list.push(Data { car, speed }); } } @@ -100,45 +161,220 @@ pub async fn deserialize_to_struct() -> Result<()> { res, r#"[ Data { - int_col: 0, - double_col: 0.0, + car: Some( + "red", + ), + speed: Some( + 0.0, + ), + }, + Data { + car: Some( + "red", + ), + speed: Some( + 1.0, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 2.0, + ), + }, + Data { + car: Some( + "red", + ), + speed: Some( + 3.0, + ), + }, + Data { + car: Some( + "red", + ), + speed: Some( + 7.0, + ), + }, + Data { + car: Some( + "red", + ), + speed: Some( + 7.1, + ), + }, + Data { + car: Some( + "red", + ), + speed: Some( + 7.2, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 8.0, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 10.0, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 10.3, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 10.4, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 10.5, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 11.0, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 12.0, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 14.0, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 15.0, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 15.1, + ), + }, + Data { + car: Some( + "green", + ), + speed: Some( + 15.2, + ), }, Data { - int_col: 1, - double_col: 10.1, + car: Some( + "red", + ), + speed: Some( + 17.0, + ), }, Data { - int_col: 0, - double_col: 0.0, + car: Some( + "red", + ), + speed: Some( + 18.0, + ), }, Data { - int_col: 1, - double_col: 10.1, + car: Some( + "red", + ), + speed: Some( + 19.0, + ), }, Data { - int_col: 0, - double_col: 0.0, + car: Some( + "red", + ), + speed: Some( + 20.0, + ), }, Data { - int_col: 1, - double_col: 10.1, + car: Some( + "red", + ), + speed: Some( + 20.3, + ), }, Data { - int_col: 0, - double_col: 0.0, + car: Some( + "red", + ), + speed: Some( + 21.4, + ), }, Data { - int_col: 1, - double_col: 10.1, + car: Some( + "red", + ), + speed: Some( + 21.5, + ), }, ]"# ); - // Use the fields in the struct to avoid clippy complaints - let int_sum = list.iter().fold(0, |acc, x| acc + x.int_col); - let double_sum = list.iter().fold(0.0, |acc, x| acc + x.double_col); - assert_eq!(int_sum, 4); - assert_eq!(double_sum, 40.4); + let speed_green_sum: f64 = list + .iter() + .filter(|data| data.car.as_deref() == Some("green")) + .filter_map(|data| data.speed) + .sum(); + let speed_red_sum: f64 = list + .iter() + .filter(|data| data.car.as_deref() == Some("red")) + .filter_map(|data| data.speed) + .sum(); + assert_eq!(speed_green_sum, 133.5); + assert_eq!(speed_red_sum, 162.5); Ok(()) } @@ -146,6 +382,6 @@ pub async fn deserialize_to_struct() -> Result<()> { /// This is target struct where we want the query results. #[derive(Debug)] struct Data { - int_col: i32, - double_col: f64, + car: Option, + speed: Option, } diff --git a/datafusion-examples/examples/dataframe/main.rs b/datafusion-examples/examples/dataframe/main.rs index 9a2604e97136d..cff18436a6a4e 100644 --- a/datafusion-examples/examples/dataframe/main.rs +++ b/datafusion-examples/examples/dataframe/main.rs @@ -21,7 +21,7 @@ //! //! ## Usage //! ```bash -//! cargo run --example dataframe -- [all|dataframe|deserialize_to_struct] +//! cargo run --example dataframe -- [all|dataframe|deserialize_to_struct|cache_factory] //! ``` //! //! Each subcommand runs a corresponding example: diff --git a/datafusion-examples/examples/execution_monitoring/tracing.rs b/datafusion-examples/examples/execution_monitoring/tracing.rs index 5fa759f2d541d..bf68c6b5346fb 100644 --- a/datafusion-examples/examples/execution_monitoring/tracing.rs +++ b/datafusion-examples/examples/execution_monitoring/tracing.rs @@ -51,16 +51,20 @@ //! 10:29:40.809 INFO main ThreadId(01) tracing: ***** WITH tracer: Non-main tasks DID inherit the `run_instrumented_query` span ***** //! ``` +use std::any::Any; +use std::path::PathBuf; +use std::sync::Arc; + use datafusion::common::runtime::{JoinSetTracer, set_join_set_tracer}; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::error::Result; use datafusion::prelude::*; -use datafusion::test_util::parquet_test_data; use futures::FutureExt; use futures::future::BoxFuture; -use std::any::Any; -use std::sync::Arc; +use tempfile::TempDir; +use tokio::fs::create_dir_all; use tracing::{Instrument, Level, Span, info, instrument}; /// Demonstrates the tracing injection feature for the DataFusion runtime @@ -126,18 +130,44 @@ async fn run_instrumented_query() -> Result<()> { info!("Starting query execution"); let ctx = SessionContext::new(); - let test_data = parquet_test_data(); + + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await?; + let file_format = ParquetFormat::default().with_enable_pruning(true); - let listing_options = ListingOptions::new(Arc::new(file_format)) - .with_file_extension("alltypes_tiny_pages_plain.parquet"); + let listing_options = + ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet"); - let table_path = format!("file://{test_data}/"); - info!("Registering table 'alltypes' from {}", table_path); - ctx.register_listing_table("alltypes", &table_path, listing_options, None, None) - .await - .expect("Failed to register table"); + info!("Registering table 'cars' from {}", path.to_str().unwrap()); + ctx.register_listing_table( + "cars", + out_dir.to_str().unwrap(), + listing_options, + None, + None, + ) + .await + .expect("Failed to register table"); - let sql = "SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"; + let sql = "SELECT COUNT(*), car, sum(speed) FROM cars GROUP BY car"; info!(sql, "Executing SQL query"); let result = ctx.sql(sql).await?.collect().await?; info!("Query complete: {} batches returned", result.len()); diff --git a/datafusion-examples/examples/flight/client.rs b/datafusion-examples/examples/flight/client.rs index 484576975a6f2..9f75666316010 100644 --- a/datafusion-examples/examples/flight/client.rs +++ b/datafusion-examples/examples/flight/client.rs @@ -18,22 +18,47 @@ //! See `main.rs` for how to run it. use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; -use tonic::transport::Endpoint; - -use datafusion::arrow::datatypes::Schema; use arrow_flight::flight_descriptor; use arrow_flight::flight_service_client::FlightServiceClient; use arrow_flight::utils::flight_data_to_arrow_batch; use arrow_flight::{FlightDescriptor, Ticket}; +use datafusion::arrow::datatypes::Schema; use datafusion::arrow::util::pretty; +use datafusion::dataframe::DataFrameWriteOptions; +use datafusion::prelude::{CsvReadOptions, SessionContext}; +use tempfile::TempDir; +use tokio::fs::create_dir_all; +use tonic::transport::Endpoint; /// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for /// Parquet files and executing SQL queries against them on a remote server. /// This example is run along-side the example `flight_server`. pub async fn client() -> Result<(), Box> { - let testdata = datafusion::test_util::parquet_test_data(); + let ctx = SessionContext::new(); + + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await?; // Create Flight client let endpoint = Endpoint::new("http://localhost:50051")?; @@ -44,7 +69,7 @@ pub async fn client() -> Result<(), Box> { let request = tonic::Request::new(FlightDescriptor { r#type: flight_descriptor::DescriptorType::Path as i32, cmd: Default::default(), - path: vec![format!("{testdata}/alltypes_plain.parquet")], + path: vec![format!("{}", out_dir.to_str().unwrap())], }); let schema_result = client.get_schema(request).await?.into_inner(); @@ -53,7 +78,7 @@ pub async fn client() -> Result<(), Box> { // Call do_get to execute a SQL query and receive results let request = tonic::Request::new(Ticket { - ticket: "SELECT id FROM alltypes_plain".into(), + ticket: "SELECT car FROM cars".into(), }); let mut stream = client.do_get(request).await?.into_inner(); diff --git a/datafusion-examples/examples/flight/server.rs b/datafusion-examples/examples/flight/server.rs index aad82e28b15ef..e98c7f89ea287 100644 --- a/datafusion-examples/examples/flight/server.rs +++ b/datafusion-examples/examples/flight/server.rs @@ -17,25 +17,27 @@ //! See `main.rs` for how to run it. -use arrow::ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator}; +use std::path::PathBuf; use std::sync::Arc; +use arrow::ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator}; +use arrow_flight::{ + Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, + HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, + flight_service_server::FlightService, flight_service_server::FlightServiceServer, +}; use arrow_flight::{PollInfo, SchemaAsIpc}; use datafusion::arrow::error::ArrowError; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ListingOptions, ListingTableUrl}; +use datafusion::prelude::*; use futures::stream::BoxStream; +use tempfile::TempDir; +use tokio::fs::create_dir_all; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; -use datafusion::prelude::*; - -use arrow_flight::{ - Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, - HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, - flight_service_server::FlightService, flight_service_server::FlightServiceServer, -}; - #[derive(Clone)] pub struct FlightServiceImpl {} @@ -85,12 +87,33 @@ impl FlightService for FlightServiceImpl { // create local execution context let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await + .map_err(|_| Status::internal("Error reading cars.csv"))?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await + .map_err(|_| Status::internal("Error writing to parquet file"))?; // register parquet file with the execution context ctx.register_parquet( - "alltypes_plain", - &format!("{testdata}/alltypes_plain.parquet"), + "cars", + out_dir.to_str().unwrap(), ParquetReadOptions::default(), ) .await diff --git a/datafusion-examples/examples/flight/sql_server.rs b/datafusion-examples/examples/flight/sql_server.rs index 435e05ffc0cec..924de13a92331 100644 --- a/datafusion-examples/examples/flight/sql_server.rs +++ b/datafusion-examples/examples/flight/sql_server.rs @@ -17,6 +17,10 @@ //! See `main.rs` for how to run it. +use std::path::PathBuf; +use std::pin::Pin; +use std::sync::Arc; + use arrow::array::{ArrayRef, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::ipc::writer::IpcWriteOptions; @@ -36,14 +40,17 @@ use arrow_flight::{ HandshakeResponse, IpcMessage, SchemaAsIpc, Ticket, }; use dashmap::DashMap; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::logical_expr::LogicalPlan; -use datafusion::prelude::{DataFrame, ParquetReadOptions, SessionConfig, SessionContext}; +use datafusion::prelude::{ + CsvReadOptions, DataFrame, ParquetReadOptions, SessionConfig, SessionContext, +}; use futures::{Stream, StreamExt, TryStreamExt}; use log::info; use mimalloc::MiMalloc; use prost::Message; -use std::pin::Pin; -use std::sync::Arc; +use tempfile::TempDir; +use tokio::fs::create_dir_all; use tonic::metadata::MetadataValue; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; @@ -100,12 +107,34 @@ impl FlightSqlServiceImpl { .with_information_schema(true); let ctx = Arc::new(SessionContext::new_with_config(session_config)); - let testdata = datafusion::test_util::parquet_test_data(); + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await + .map_err(|e| status!("Error reading cars.csv", e))?; + let tmp_source = + TempDir::new().map_err(|e| status!("Error creating temp dir", e))?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await + .map_err(|e| status!("Error writing to parquet", e))?; // register parquet file with the execution context ctx.register_parquet( - "alltypes_plain", - &format!("{testdata}/alltypes_plain.parquet"), + "cars", + out_dir.to_str().unwrap(), ParquetReadOptions::default(), ) .await diff --git a/datafusion-examples/examples/query_planning/parse_sql_expr.rs b/datafusion-examples/examples/query_planning/parse_sql_expr.rs index 376120de9d492..18b5d9432d35c 100644 --- a/datafusion-examples/examples/query_planning/parse_sql_expr.rs +++ b/datafusion-examples/examples/query_planning/parse_sql_expr.rs @@ -17,15 +17,22 @@ //! See `main.rs` for how to run it. +use std::path::PathBuf; + use arrow::datatypes::{DataType, Field, Schema}; use datafusion::common::DFSchema; +use datafusion::common::ScalarValue; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::logical_expr::{col, lit}; +use datafusion::prelude::CsvReadOptions; use datafusion::sql::unparser::Unparser; use datafusion::{ assert_batches_eq, error::Result, prelude::{ParquetReadOptions, SessionContext}, }; +use tempfile::TempDir; +use tokio::fs::create_dir_all; /// This example demonstrates the programmatic parsing of SQL expressions using /// the DataFusion [`SessionContext::parse_sql_expr`] API or the [`DataFrame::parse_sql_expr`] API. @@ -70,20 +77,38 @@ fn simple_session_context_parse_sql_expr_demo() -> Result<()> { /// DataFusion can parse a SQL text to an logical expression using schema at [`DataFrame`]. async fn simple_dataframe_parse_sql_expr_demo() -> Result<()> { - let sql = "int_col < 5 OR double_col = 8.0"; - let expr = col("int_col") - .lt(lit(5_i64)) - .or(col("double_col").eq(lit(8.0_f64))); + let sql = "car = 'red' OR speed > 1.0"; + let expr = col("car") + .eq(lit(ScalarValue::Utf8(Some("red".to_string())))) + .or(col("speed").gt(lit(1.0_f64))); let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); - let df = ctx - .read_parquet( - &format!("{testdata}/alltypes_plain.parquet"), - ParquetReadOptions::default(), + + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, ) .await?; + let df = ctx + .read_parquet(out_dir.to_str().unwrap(), ParquetReadOptions::default()) + .await?; + let parsed_expr = df.parse_sql_expr(sql)?; assert_eq!(parsed_expr, expr); @@ -93,39 +118,54 @@ async fn simple_dataframe_parse_sql_expr_demo() -> Result<()> { async fn query_parquet_demo() -> Result<()> { let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); - let df = ctx - .read_parquet( - &format!("{testdata}/alltypes_plain.parquet"), - ParquetReadOptions::default(), + + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, ) .await?; + let df = ctx + .read_parquet(out_dir.to_str().unwrap(), ParquetReadOptions::default()) + .await?; + let df = df .clone() - .select(vec![ - df.parse_sql_expr("int_col")?, - df.parse_sql_expr("double_col")?, - ])? - .filter(df.parse_sql_expr("int_col < 5 OR double_col = 8.0")?)? + .select(vec![df.parse_sql_expr("car")?, df.parse_sql_expr("speed")?])? + .filter(df.parse_sql_expr("car = 'red' OR speed > 1.0")?)? .aggregate( - vec![df.parse_sql_expr("double_col")?], - vec![df.parse_sql_expr("SUM(int_col) as sum_int_col")?], + vec![df.parse_sql_expr("car")?], + vec![df.parse_sql_expr("SUM(speed) as sum_speed")?], )? // Directly parsing the SQL text into a sort expression is not supported yet, so // construct it programmatically - .sort(vec![col("double_col").sort(false, false)])? + .sort(vec![col("car").sort(false, false)])? .limit(0, Some(1))?; let result = df.collect().await?; assert_batches_eq!( &[ - "+------------+-------------+", - "| double_col | sum_int_col |", - "+------------+-------------+", - "| 10.1 | 4 |", - "+------------+-------------+", + "+-----+--------------------+", + "| car | sum_speed |", + "+-----+--------------------+", + "| red | 162.49999999999997 |", + "+-----+--------------------+" ], &result ); @@ -135,17 +175,35 @@ async fn query_parquet_demo() -> Result<()> { /// DataFusion can parse a SQL text and convert it back to SQL using [`Unparser`]. async fn round_trip_parse_sql_expr_demo() -> Result<()> { - let sql = "((int_col < 5) OR (double_col = 8))"; + let sql = "((car = 'red') OR (speed > 1.0))"; let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); - let df = ctx - .read_parquet( - &format!("{testdata}/alltypes_plain.parquet"), - ParquetReadOptions::default(), + + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, ) .await?; + let df = ctx + .read_parquet(out_dir.to_str().unwrap(), ParquetReadOptions::default()) + .await?; + let parsed_expr = df.parse_sql_expr(sql)?; let unparser = Unparser::default(); @@ -158,7 +216,7 @@ async fn round_trip_parse_sql_expr_demo() -> Result<()> { // difference in precedence rules between DataFusion and target engines. let unparser = Unparser::default().with_pretty(true); - let pretty = "int_col < 5 OR double_col = 8"; + let pretty = "car = 'red' OR speed > 1.0"; let pretty_round_trip_sql = unparser.expr_to_sql(&parsed_expr)?.to_string(); assert_eq!(pretty, pretty_round_trip_sql); diff --git a/datafusion-examples/examples/query_planning/plan_to_sql.rs b/datafusion-examples/examples/query_planning/plan_to_sql.rs index 756cc80b8f3c7..ba2db08780c23 100644 --- a/datafusion-examples/examples/query_planning/plan_to_sql.rs +++ b/datafusion-examples/examples/query_planning/plan_to_sql.rs @@ -17,7 +17,13 @@ //! See `main.rs` for how to run it. +use std::fmt; +use std::path::PathBuf; +use std::sync::Arc; + use datafusion::common::DFSchemaRef; +use datafusion::common::ScalarValue; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::error::Result; use datafusion::logical_expr::sqlparser::ast::Statement; use datafusion::logical_expr::{ @@ -35,8 +41,8 @@ use datafusion::sql::unparser::extension_unparser::{ UnparseToStatementResult, UnparseWithinStatementResult, }; use datafusion::sql::unparser::{Unparser, plan_to_sql}; -use std::fmt; -use std::sync::Arc; +use tempfile::TempDir; +use tokio::fs::create_dir_all; /// This example demonstrates the programmatic construction of SQL strings using /// the DataFusion Expr [`Expr`] and LogicalPlan [`LogicalPlan`] API. @@ -114,21 +120,38 @@ fn simple_expr_to_sql_demo_escape_mysql_style() -> Result<()> { async fn simple_plan_to_sql_demo() -> Result<()> { let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); - let df = ctx - .read_parquet( - &format!("{testdata}/alltypes_plain.parquet"), - ParquetReadOptions::default(), + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, ) + .await?; + + let df = ctx + .read_parquet(out_dir.to_str().unwrap(), ParquetReadOptions::default()) .await? - .select_columns(&["id", "int_col", "double_col", "date_string_col"])?; + .select_columns(&["car", "speed", "time"])?; // Convert the data frame to a SQL string let sql = plan_to_sql(df.logical_plan())?.to_string(); assert_eq!( sql, - r#"SELECT "?table?".id, "?table?".int_col, "?table?".double_col, "?table?".date_string_col FROM "?table?""# + r#"SELECT "?table?".car, "?table?".speed, "?table?"."time" FROM "?table?""# ); Ok(()) @@ -139,35 +162,52 @@ async fn simple_plan_to_sql_demo() -> Result<()> { async fn round_trip_plan_to_sql_demo() -> Result<()> { let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await?; // register parquet file with the execution context ctx.register_parquet( - "alltypes_plain", - &format!("{testdata}/alltypes_plain.parquet"), + "cars", + out_dir.to_str().unwrap(), ParquetReadOptions::default(), ) .await?; // create a logical plan from a SQL string and then programmatically add new filters + // select car, speed, time from cars where speed > 1 and car = 'red' let df = ctx // Use SQL to read some data from the parquet file - .sql( - "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \ - FROM alltypes_plain", - ) + .sql("SELECT car, speed, time FROM cars") .await? - // Add id > 1 and tinyint_col < double_col filter + // Add speed > 1 and car = 'red' filter .filter( - col("id") + col("speed") .gt(lit(1)) - .and(col("tinyint_col").lt(col("double_col"))), + .and(col("car").eq(lit(ScalarValue::Utf8(Some("red".to_string()))))), )?; let sql = plan_to_sql(df.logical_plan())?.to_string(); assert_eq!( sql, - r#"SELECT alltypes_plain.int_col, alltypes_plain.double_col, CAST(alltypes_plain.date_string_col AS VARCHAR) FROM alltypes_plain WHERE ((alltypes_plain.id > 1) AND (alltypes_plain.tinyint_col < alltypes_plain.double_col))"# + r#"SELECT cars.car, cars.speed, cars."time" FROM cars WHERE ((cars.speed > 1) AND (cars.car = 'red'))"# ); Ok(()) @@ -231,14 +271,32 @@ impl UserDefinedLogicalNodeUnparser for PlanToStatement { /// It can be unparse as a statement that reads from the same parquet file. async fn unparse_my_logical_plan_as_statement() -> Result<()> { let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); - let inner_plan = ctx - .read_parquet( - &format!("{testdata}/alltypes_plain.parquet"), - ParquetReadOptions::default(), + + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, ) + .await?; + + let inner_plan = ctx + .read_parquet(out_dir.to_str().unwrap(), ParquetReadOptions::default()) .await? - .select_columns(&["id", "int_col", "double_col", "date_string_col"])? + .select_columns(&["car", "speed", "time"])? .into_unoptimized_plan(); let node = Arc::new(MyLogicalPlan { input: inner_plan }); @@ -249,7 +307,7 @@ async fn unparse_my_logical_plan_as_statement() -> Result<()> { let sql = unparser.plan_to_sql(&my_plan)?.to_string(); assert_eq!( sql, - r#"SELECT "?table?".id, "?table?".int_col, "?table?".double_col, "?table?".date_string_col FROM "?table?""# + r#"SELECT "?table?".car, "?table?".speed, "?table?"."time" FROM "?table?""# ); Ok(()) } @@ -284,14 +342,32 @@ impl UserDefinedLogicalNodeUnparser for PlanToSubquery { /// It can be unparse as a subquery that reads from the same parquet file, with some columns projected. async fn unparse_my_logical_plan_as_subquery() -> Result<()> { let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); - let inner_plan = ctx - .read_parquet( - &format!("{testdata}/alltypes_plain.parquet"), - ParquetReadOptions::default(), + + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, ) + .await?; + + let inner_plan = ctx + .read_parquet(out_dir.to_str().unwrap(), ParquetReadOptions::default()) .await? - .select_columns(&["id", "int_col", "double_col", "date_string_col"])? + .select_columns(&["car", "speed", "time"])? .into_unoptimized_plan(); let node = Arc::new(MyLogicalPlan { input: inner_plan }); @@ -299,8 +375,8 @@ async fn unparse_my_logical_plan_as_subquery() -> Result<()> { let my_plan = LogicalPlan::Extension(Extension { node }); let plan = LogicalPlanBuilder::from(my_plan) .project(vec![ - col("id").alias("my_id"), - col("int_col").alias("my_int"), + col("car").alias("my_car"), + col("speed").alias("my_speed"), ])? .build()?; let unparser = @@ -308,8 +384,8 @@ async fn unparse_my_logical_plan_as_subquery() -> Result<()> { let sql = unparser.plan_to_sql(&plan)?.to_string(); assert_eq!( sql, - "SELECT \"?table?\".id AS my_id, \"?table?\".int_col AS my_int FROM \ - (SELECT \"?table?\".id, \"?table?\".int_col, \"?table?\".double_col, \"?table?\".date_string_col FROM \"?table?\")", + "SELECT \"?table?\".car AS my_car, \"?table?\".speed AS my_speed FROM \ + (SELECT \"?table?\".car, \"?table?\".speed, \"?table?\".\"time\" FROM \"?table?\")", ); Ok(()) } diff --git a/datafusion-examples/examples/query_planning/planner_api.rs b/datafusion-examples/examples/query_planning/planner_api.rs index 9b8aa1c2fe649..46174709f64f5 100644 --- a/datafusion-examples/examples/query_planning/planner_api.rs +++ b/datafusion-examples/examples/query_planning/planner_api.rs @@ -17,11 +17,16 @@ //! See `main.rs` for how to run it. +use std::path::PathBuf; + +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::error::Result; use datafusion::logical_expr::LogicalPlan; use datafusion::physical_plan::displayable; use datafusion::physical_planner::DefaultPhysicalPlanner; use datafusion::prelude::*; +use tempfile::TempDir; +use tokio::fs::create_dir_all; /// This example demonstrates the process of converting logical plan /// into physical execution plans using DataFusion. @@ -37,25 +42,40 @@ use datafusion::prelude::*; pub async fn planner_api() -> Result<()> { // Set up a DataFusion context and load a Parquet file let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); - let df = ctx - .read_parquet( - &format!("{testdata}/alltypes_plain.parquet"), - ParquetReadOptions::default(), + + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, ) .await?; + let df = ctx + .read_parquet(out_dir.to_str().unwrap(), ParquetReadOptions::default()) + .await?; + // Construct the input logical plan using DataFrame API let df = df .clone() - .select(vec![ - df.parse_sql_expr("int_col")?, - df.parse_sql_expr("double_col")?, - ])? - .filter(df.parse_sql_expr("int_col < 5 OR double_col = 8.0")?)? + .select(vec![df.parse_sql_expr("car")?, df.parse_sql_expr("speed")?])? + .filter(df.parse_sql_expr("car = 'red' OR speed > 1.0")?)? .aggregate( - vec![df.parse_sql_expr("double_col")?], - vec![df.parse_sql_expr("SUM(int_col) as sum_int_col")?], + vec![df.parse_sql_expr("car")?], + vec![df.parse_sql_expr("SUM(speed) as sum_speed")?], )? .limit(0, Some(1))?; let logical_plan = df.logical_plan().clone(); diff --git a/datafusion-examples/examples/query_planning/thread_pools.rs b/datafusion-examples/examples/query_planning/thread_pools.rs index 6fc7d51e91c1f..292e838c8e4c6 100644 --- a/datafusion-examples/examples/query_planning/thread_pools.rs +++ b/datafusion-examples/examples/query_planning/thread_pools.rs @@ -37,15 +37,20 @@ //! //! [Architecture section]: https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes +use std::path::PathBuf; +use std::sync::Arc; + use arrow::util::pretty::pretty_format_batches; use datafusion::common::runtime::JoinSet; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::error::Result; use datafusion::execution::SendableRecordBatchStream; use datafusion::prelude::*; use futures::stream::StreamExt; use object_store::client::SpawnedReqwestConnector; use object_store::http::HttpBuilder; -use std::sync::Arc; +use tempfile::TempDir; +use tokio::fs::create_dir_all; use tokio::runtime::Handle; use tokio::sync::Notify; use url::Url; @@ -70,10 +75,29 @@ pub async fn thread_pools() -> Result<()> { // The first two examples read local files. Enabling the URL table feature // lets us treat filenames as tables in SQL. let ctx = SessionContext::new().enable_url_table(); - let sql = format!( - "SELECT * FROM '{}/alltypes_plain.parquet'", - datafusion::test_util::parquet_test_data() - ); + + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await?; + + let sql = format!("SELECT * FROM '{}'", out_dir.to_str().unwrap()); // Run a query on the current runtime. Calling `await` means the future // (in this case the `async` function and all spawned work in DataFusion diff --git a/datafusion-examples/examples/sql_ops/query.rs b/datafusion-examples/examples/sql_ops/query.rs index 90d0c3ca34a00..d9c9a1c0d1274 100644 --- a/datafusion-examples/examples/sql_ops/query.rs +++ b/datafusion-examples/examples/sql_ops/query.rs @@ -17,18 +17,22 @@ //! See `main.rs` for how to run it. +use std::path::PathBuf; +use std::sync::Arc; + use datafusion::arrow::array::{UInt8Array, UInt64Array}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::catalog::MemTable; use datafusion::common::{assert_batches_eq, exec_datafusion_err}; -use datafusion::datasource::MemTable; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::error::{DataFusionError, Result}; use datafusion::prelude::*; use object_store::local::LocalFileSystem; -use std::path::Path; -use std::sync::Arc; +use tempfile::TempDir; +use tokio::fs::create_dir_all; /// Examples of various ways to execute queries using SQL /// @@ -113,20 +117,38 @@ async fn query_parquet() -> Result<()> { // create local execution context let ctx = SessionContext::new(); - let test_data = datafusion::test_util::parquet_test_data(); + // Load CSV into an in-memory DataFrame, then materialize it to Parquet. + // This replaces a static parquet fixture and makes the example self-contained + // without requiring DataFusion test files. + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + let csv_df = ctx + .read_csv(path.to_str().unwrap(), CsvReadOptions::default()) + .await?; + let tmp_source = TempDir::new()?; + let out_dir = tmp_source.path().join("parquet_source"); + create_dir_all(&out_dir).await?; + csv_df + .write_parquet( + out_dir.to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + ) + .await?; // Configure listing options let file_format = ParquetFormat::default().with_enable_pruning(true); - let listing_options = ListingOptions::new(Arc::new(file_format)) - // This is a workaround for this example since `test_data` contains - // many different parquet different files, - // in practice use FileType::PARQUET.get_ext(). - .with_file_extension("alltypes_plain.parquet"); + let listing_options = + ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet"); + + let table_path = format!("file://{}", out_dir.to_str().unwrap()); // First example were we use an absolute path, which requires no additional setup. ctx.register_listing_table( "my_table", - &format!("file://{test_data}/"), + &table_path, listing_options.clone(), None, None, @@ -139,6 +161,7 @@ async fn query_parquet() -> Result<()> { .sql( "SELECT * \ FROM my_table \ + ORDER BY speed \ LIMIT 1", ) .await?; @@ -147,21 +170,21 @@ async fn query_parquet() -> Result<()> { let results = df.collect().await?; assert_batches_eq!( [ - "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", - "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |", - "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", - "| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |", - "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + "+-----+-------+---------------------+", + "| car | speed | time |", + "+-----+-------+---------------------+", + "| red | 0.0 | 1996-04-12T12:05:15 |", + "+-----+-------+---------------------+", ], &results ); - // Second example were we temporarily move into the test data's parent directory and - // simulate a relative path, this requires registering an ObjectStore. + // Second example where we change the current working directory and explicitly + // register a local filesystem object store. This demonstrates how listing tables + // resolve paths via an ObjectStore, even when using filesystem-backed data. let cur_dir = std::env::current_dir()?; - let test_data_path = Path::new(&test_data); - let test_data_path_parent = test_data_path + let test_data_path_parent = out_dir .parent() .ok_or(exec_datafusion_err!("test_data path needs a parent"))?; @@ -177,7 +200,7 @@ async fn query_parquet() -> Result<()> { // for the query ctx.register_listing_table( "relative_table", - "./data", + out_dir.to_str().unwrap(), listing_options.clone(), None, None, @@ -189,6 +212,7 @@ async fn query_parquet() -> Result<()> { .sql( "SELECT * \ FROM relative_table \ + ORDER BY speed \ LIMIT 1", ) .await?; @@ -197,11 +221,11 @@ async fn query_parquet() -> Result<()> { let results = df.collect().await?; assert_batches_eq!( [ - "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", - "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |", - "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", - "| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |", - "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + "+-----+-------+---------------------+", + "| car | speed | time |", + "+-----+-------+---------------------+", + "| red | 0.0 | 1996-04-12T12:05:15 |", + "+-----+-------+---------------------+", ], &results ); diff --git a/datafusion-examples/examples/udf/advanced_udwf.rs b/datafusion-examples/examples/udf/advanced_udwf.rs index e8d3a75b29dec..15eaf4e4521c3 100644 --- a/datafusion-examples/examples/udf/advanced_udwf.rs +++ b/datafusion-examples/examples/udf/advanced_udwf.rs @@ -17,7 +17,8 @@ //! See `main.rs` for how to run it. -use std::{any::Any, fs::File, io::Write, sync::Arc}; +use std::path::PathBuf; +use std::{any::Any, sync::Arc}; use arrow::datatypes::Field; use arrow::{ @@ -40,7 +41,6 @@ use datafusion::logical_expr::{ use datafusion::physical_expr::PhysicalExpr; use datafusion::prelude::*; use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; -use tempfile::tempdir; /// This example shows how to use the full WindowUDFImpl API to implement a user /// defined window function. As in the `simple_udwf.rs` example, this struct implements @@ -230,44 +230,12 @@ async fn create_context() -> Result { // declare a new context. In spark API, this corresponds to a new spark SQL session let ctx = SessionContext::new(); - // content from file 'datafusion/core/tests/data/cars.csv' - let csv_data = r#"car,speed,time -red,20.0,1996-04-12T12:05:03.000000000 -red,20.3,1996-04-12T12:05:04.000000000 -red,21.4,1996-04-12T12:05:05.000000000 -red,21.5,1996-04-12T12:05:06.000000000 -red,19.0,1996-04-12T12:05:07.000000000 -red,18.0,1996-04-12T12:05:08.000000000 -red,17.0,1996-04-12T12:05:09.000000000 -red,7.0,1996-04-12T12:05:10.000000000 -red,7.1,1996-04-12T12:05:11.000000000 -red,7.2,1996-04-12T12:05:12.000000000 -red,3.0,1996-04-12T12:05:13.000000000 -red,1.0,1996-04-12T12:05:14.000000000 -red,0.0,1996-04-12T12:05:15.000000000 -green,10.0,1996-04-12T12:05:03.000000000 -green,10.3,1996-04-12T12:05:04.000000000 -green,10.4,1996-04-12T12:05:05.000000000 -green,10.5,1996-04-12T12:05:06.000000000 -green,11.0,1996-04-12T12:05:07.000000000 -green,12.0,1996-04-12T12:05:08.000000000 -green,14.0,1996-04-12T12:05:09.000000000 -green,15.0,1996-04-12T12:05:10.000000000 -green,15.1,1996-04-12T12:05:11.000000000 -green,15.2,1996-04-12T12:05:12.000000000 -green,8.0,1996-04-12T12:05:13.000000000 -green,2.0,1996-04-12T12:05:14.000000000 -"#; - let dir = tempdir()?; - let file_path = dir.path().join("cars.csv"); - { - let mut file = File::create(&file_path)?; - // write CSV data - file.write_all(csv_data.as_bytes())?; - } // scope closes the file - let file_path = file_path.to_str().unwrap(); - - ctx.register_csv("cars", file_path, CsvReadOptions::new()) + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); + + ctx.register_csv("cars", path.to_str().unwrap(), CsvReadOptions::new()) .await?; Ok(ctx) diff --git a/datafusion-examples/examples/udf/simple_udtf.rs b/datafusion-examples/examples/udf/simple_udtf.rs index 087b8ba73af5c..3d8b7a87b766b 100644 --- a/datafusion-examples/examples/udf/simple_udtf.rs +++ b/datafusion-examples/examples/udf/simple_udtf.rs @@ -37,6 +37,7 @@ use datafusion::prelude::*; use std::fs::File; use std::io::Seek; use std::path::Path; +use std::path::PathBuf; use std::sync::Arc; // To define your own table function, you only need to do the following 3 things: // 1. Implement your own [`TableProvider`] @@ -51,18 +52,26 @@ pub async fn simple_udtf() -> Result<()> { // register the table function that will be called in SQL statements by `read_csv` ctx.register_udtf("read_csv", Arc::new(LocalCsvTableFunc {})); - let testdata = datafusion::test_util::arrow_test_data(); - let csv_file = format!("{testdata}/csv/aggregate_test_100.csv"); + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("data") + .join("csv") + .join("cars.csv"); // Pass 2 arguments, read csv with at most 2 rows (simplify logic makes 1+1 --> 2) let df = ctx - .sql(format!("SELECT * FROM read_csv('{csv_file}', 1 + 1);").as_str()) + .sql( + format!( + "SELECT * FROM read_csv('{}', 1 + 1);", + path.to_str().unwrap() + ) + .as_str(), + ) .await?; df.show().await?; // just run, return all rows let df = ctx - .sql(format!("SELECT * FROM read_csv('{csv_file}');").as_str()) + .sql(format!("SELECT * FROM read_csv('{}');", path.to_str().unwrap()).as_str()) .await?; df.show().await?;