Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ cargo run --example dataframe -- dataframe

#### Category: Single Process

| Subcommand | File Path | Description |
| --------------------- | ----------------------------------------------------------------------------------- | ------------------------------------------------------ |
| dataframe | [`dataframe/dataframe.rs`](examples/dataframe/dataframe.rs) | Query DataFrames from various sources and write output |
| deserialize_to_struct | [`dataframe/deserialize_to_struct.rs`](examples/dataframe/deserialize_to_struct.rs) | Convert Arrow arrays into Rust structs |
| Subcommand | File Path | Description |
| --------------------- | ----------------------------------------------------------------------------------- | ------------------------------------------------------- |
| cache_factory | [`dataframe/cache_factory.rs`](examples/dataframe/cache_factory.rs) | Custom lazy caching for DataFrames using `CacheFactory` |
| dataframe | [`dataframe/dataframe.rs`](examples/dataframe/dataframe.rs) | Query DataFrames from various sources and write output |
| deserialize_to_struct | [`dataframe/deserialize_to_struct.rs`](examples/dataframe/deserialize_to_struct.rs) | Convert Arrow arrays into Rust structs |

## Execution Monitoring Examples

Expand Down
25 changes: 25 additions & 0 deletions datafusion-examples/data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

## Example datasets

| Filename | Path | Description |
| ----------- | --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `cars.csv` | [`data/csv/cars.csv`](./csv/cars.csv) | Time-series–like dataset containing car identifiers, speed values, and timestamps. Used in window function and time-based query examples (e.g. ordering, window frames). |
| `regex.csv` | [`data/csv/regex.csv`](./csv/regex.csv) | Dataset for regular expression examples. Contains input values, regex patterns, replacement strings, and optional flags. Covers ASCII, Unicode, and locale-specific text processing. |
26 changes: 26 additions & 0 deletions datafusion-examples/data/csv/cars.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
car,speed,time
red,20.0,1996-04-12T12:05:03.000000000
red,20.3,1996-04-12T12:05:04.000000000
red,21.4,1996-04-12T12:05:05.000000000
red,21.5,1996-04-12T12:05:06.000000000
red,19.0,1996-04-12T12:05:07.000000000
red,18.0,1996-04-12T12:05:08.000000000
red,17.0,1996-04-12T12:05:09.000000000
red,7.0,1996-04-12T12:05:10.000000000
red,7.1,1996-04-12T12:05:11.000000000
red,7.2,1996-04-12T12:05:12.000000000
red,3.0,1996-04-12T12:05:13.000000000
red,1.0,1996-04-12T12:05:14.000000000
red,0.0,1996-04-12T12:05:15.000000000
green,10.0,1996-04-12T12:05:03.000000000
green,10.3,1996-04-12T12:05:04.000000000
green,10.4,1996-04-12T12:05:05.000000000
green,10.5,1996-04-12T12:05:06.000000000
green,11.0,1996-04-12T12:05:07.000000000
green,12.0,1996-04-12T12:05:08.000000000
green,14.0,1996-04-12T12:05:09.000000000
green,15.0,1996-04-12T12:05:10.000000000
green,15.1,1996-04-12T12:05:11.000000000
green,15.2,1996-04-12T12:05:12.000000000
green,8.0,1996-04-12T12:05:13.000000000
green,2.0,1996-04-12T12:05:14.000000000
12 changes: 12 additions & 0 deletions datafusion-examples/data/csv/regex.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
values,patterns,replacement,flags
abc,^(a),bb\1bb,i
ABC,^(A).*,B,i
aBc,(b|d),e,i
AbC,(B|D),e,
aBC,^(b|c),d,
4000,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
4010,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
Düsseldorf,[\p{Letter}-]+,München,
Москва,[\p{L}-]+,Moscow,
Köln,[a-zA-Z]ö[a-zA-Z]{2},Koln,
اليوم,^\p{Arabic}+$,Today,
33 changes: 7 additions & 26 deletions datafusion-examples/examples/builtin_functions/regexp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Licensed to the Apache Software Foundation (ASF) under one
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
Expand All @@ -18,12 +17,11 @@

//! See `main.rs` for how to run it.

use std::{fs::File, io::Write};
use std::path::PathBuf;

use datafusion::common::{assert_batches_eq, assert_contains};
use datafusion::error::Result;
use datafusion::prelude::*;
use tempfile::tempdir;

/// This example demonstrates how to use the regexp_* functions
///
Expand All @@ -35,29 +33,12 @@ use tempfile::tempdir;
/// https://docs.rs/regex/latest/regex/#grouping-and-flags
pub async fn regexp() -> Result<()> {
let ctx = SessionContext::new();
// content from file 'datafusion/physical-expr/tests/data/regex.csv'
let csv_data = r#"values,patterns,replacement,flags
abc,^(a),bb\1bb,i
ABC,^(A).*,B,i
aBc,(b|d),e,i
AbC,(B|D),e,
aBC,^(b|c),d,
4000,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
4010,\b4([1-9]\d\d|\d[1-9]\d|\d\d[1-9])\b,xyz,
Düsseldorf,[\p{Letter}-]+,München,
Москва,[\p{L}-]+,Moscow,
Köln,[a-zA-Z]ö[a-zA-Z]{2},Koln,
اليوم,^\p{Arabic}+$,Today,"#;
let dir = tempdir()?;
let file_path = dir.path().join("regex.csv");
{
let mut file = File::create(&file_path)?;
// write CSV data
file.write_all(csv_data.as_bytes())?;
} // scope closes the file
let file_path = file_path.to_str().unwrap();

ctx.register_csv("examples", file_path, CsvReadOptions::new())
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("data")
.join("csv")
.join("regex.csv");

ctx.register_csv("examples", path.to_str().unwrap(), CsvReadOptions::new())
.await?;

//
Expand Down
42 changes: 25 additions & 17 deletions datafusion-examples/examples/custom_data_source/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

//! See `main.rs` for how to run it.

use std::path::PathBuf;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::config::CsvOptions;
use datafusion::{
assert_batches_eq,
Expand All @@ -31,7 +32,6 @@ use datafusion::{
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};

use datafusion::datasource::physical_plan::FileScanConfigBuilder;
Expand All @@ -50,12 +50,20 @@ pub async fn csv_json_opener() -> Result<()> {

async fn csv_opener() -> Result<()> {
let object_store = Arc::new(LocalFileSystem::new());
let schema = aggr_test_schema();

let testdata = datafusion::test_util::arrow_test_data();
let path = format!("{testdata}/csv/aggregate_test_100.csv");
let schema = Arc::new(Schema::new(vec![
Field::new("car", DataType::Utf8, false),
Field::new("speed", DataType::Float64, false),
Field::new(
"time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
]));

let path = std::path::Path::new(&path).canonicalize()?;
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("data")
.join("csv")
.join("cars.csv");

let options = CsvOptions {
has_header: Some(true),
Expand All @@ -71,7 +79,7 @@ async fn csv_opener() -> Result<()> {

let scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_projection_indices(Some(vec![12, 0]))?
.with_projection_indices(Some(vec![0, 1]))?
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10))
.build();
Expand All @@ -89,15 +97,15 @@ async fn csv_opener() -> Result<()> {
}
assert_batches_eq!(
&[
"+--------------------------------+----+",
"| c13 | c1 |",
"+--------------------------------+----+",
"| 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | c |",
"| C2GT5KVyOPZpgKVl110TyZO0NcJ434 | d |",
"| AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | b |",
"| 0keZ5G8BffGwgF2RwQD59TFzMStxCB | a |",
"| Ig1QcuKsjHXkproePdERo2w0mYzIqd | b |",
"+--------------------------------+----+",
"+-----+-------+",
"| car | speed |",
"+-----+-------+",
"| red | 20.0 |",
"| red | 20.3 |",
"| red | 21.4 |",
"| red | 21.5 |",
"| red | 19.0 |",
"+-----+-------+",
],
&result
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

//! See `main.rs` for how to run it.

use datafusion::common::test_util::datafusion_test_data;
use std::path::PathBuf;

use datafusion::error::Result;
use datafusion::prelude::*;

Expand All @@ -27,33 +28,36 @@ pub async fn csv_sql_streaming() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();

let testdata = datafusion_test_data();
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("data")
.join("csv")
.join("cars.csv");

// Register a table source and tell DataFusion the file is ordered by `ts ASC`.
// Register a table source and tell DataFusion the file is ordered by `car ASC`.
// Note it is the responsibility of the user to make sure
// that file indeed satisfies this condition or else incorrect answers may be produced.
let asc = true;
let nulls_first = true;
let sort_expr = vec![col("ts").sort(asc, nulls_first)];
let sort_expr = vec![col("car").sort(asc, nulls_first)];
// register csv file with the execution context
ctx.register_csv(
"ordered_table",
&format!("{testdata}/window_1.csv"),
path.to_str().unwrap(),
CsvReadOptions::new().file_sort_order(vec![sort_expr]),
)
.await?;

// execute the query
// Following query can be executed with unbounded sources because group by expressions (e.g ts) is
// Following query can be executed with unbounded sources because group by expressions (e.g car) is
// already ordered at the source.
//
// Unbounded sources means that if the input came from a "never ending" source (such as a FIFO
// file on unix) the query could produce results incrementally as data was read.
let df = ctx
.sql(
"SELECT ts, MIN(inc_col), MAX(inc_col) \
"SELECT car, MIN(speed), MAX(speed) \
FROM ordered_table \
GROUP BY ts",
GROUP BY car",
)
.await?;

Expand All @@ -64,7 +68,7 @@ pub async fn csv_sql_streaming() -> Result<()> {
// its result in streaming fashion, because its required ordering is already satisfied at the source.
let df = ctx
.sql(
"SELECT ts, SUM(inc_col) OVER(ORDER BY ts ASC) \
"SELECT car, SUM(speed) OVER(ORDER BY car ASC) \
FROM ordered_table",
)
.await?;
Expand Down
50 changes: 36 additions & 14 deletions datafusion-examples/examples/data_io/parquet_encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,49 @@

//! See `main.rs` for how to run it.

use std::path::PathBuf;
use std::sync::Arc;

use datafusion::common::DataFusionError;
use datafusion::config::{ConfigFileEncryptionProperties, TableParquetOptions};
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::logical_expr::{col, lit};
use datafusion::parquet::encryption::decrypt::FileDecryptionProperties;
use datafusion::parquet::encryption::encrypt::FileEncryptionProperties;
use datafusion::prelude::CsvReadOptions;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use std::sync::Arc;
use tempfile::TempDir;
use tokio::fs::create_dir_all;

/// Read and write encrypted Parquet files using DataFusion
pub async fn parquet_encrypted() -> datafusion::common::Result<()> {
// The SessionContext is the main high level API for interacting with DataFusion
let ctx = SessionContext::new();

// Find the local path of "alltypes_plain.parquet"
let testdata = datafusion::test_util::parquet_test_data();
let filename = &format!("{testdata}/alltypes_plain.parquet");
// Load CSV into an in-memory DataFrame, then materialize it to Parquet.
// This replaces a static parquet fixture and makes the example self-contained
// without requiring DataFusion test files.
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can change this example to read in the cars.csv file and then write it back out as an encrypted parquet file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense.

I agree that using fewer, clearer datasets in the examples is a good direction. I can look into rewriting this parquet_encrypted example to read from cars.csv and then write the encrypted parquet output, instead of relying on alltypes_plain.parquet.

I’ll prototype that approach and report back once I confirm everything works cleanly with the encryption workflow.

.join("data")
.join("csv")
.join("cars.csv");
let csv_df = ctx
.read_csv(path.to_str().unwrap(), CsvReadOptions::default())
.await?;
let tmp_source = TempDir::new()?;
let out_dir = tmp_source.path().join("parquet_source");
create_dir_all(&out_dir).await?;
csv_df
.write_parquet(
out_dir.to_str().unwrap(),
DataFrameWriteOptions::default(),
None,
)
.await?;

// Read the sample parquet file
let parquet_df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.read_parquet(out_dir.to_str().unwrap(), ParquetReadOptions::default())
.await?;

// Show information from the dataframe
Expand All @@ -52,27 +73,27 @@ pub async fn parquet_encrypted() -> datafusion::common::Result<()> {
let (encrypt, decrypt) = setup_encryption(&parquet_df)?;

// Create a temporary file location for the encrypted parquet file
let tmp_dir = TempDir::new()?;
let tempfile = tmp_dir.path().join("alltypes_plain-encrypted.parquet");
let tempfile_str = tempfile.into_os_string().into_string().unwrap();
let tempfile = tmp_source.path().join("cars_encrypted");

// Write encrypted parquet
let mut options = TableParquetOptions::default();
options.crypto.file_encryption = Some(ConfigFileEncryptionProperties::from(&encrypt));
parquet_df
.write_parquet(
tempfile_str.as_str(),
tempfile.to_str().unwrap(),
DataFrameWriteOptions::new().with_single_file_output(true),
Some(options),
)
.await?;

// Read encrypted parquet
// Read encrypted parquet back as a DataFrame using matching decryption config
let ctx: SessionContext = SessionContext::new();
let read_options =
ParquetReadOptions::default().file_decryption_properties((&decrypt).into());

let encrypted_parquet_df = ctx.read_parquet(tempfile_str, read_options).await?;
let encrypted_parquet_df = ctx
.read_parquet(tempfile.to_str().unwrap(), read_options)
.await?;

// Show information from the dataframe
println!(
Expand All @@ -91,11 +112,12 @@ async fn query_dataframe(df: &DataFrame) -> Result<(), DataFusionError> {
df.clone().describe().await?.show().await?;

// Select three columns and filter the results
// so that only rows where id > 1 are returned
// so that only rows where speed > 5 are returned
// select car, speed, time from t where speed > 5
println!("\nSelected rows and columns:");
df.clone()
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(5)))?
.select_columns(&["car", "speed", "time"])?
.filter(col("speed").gt(lit(5)))?
.show()
.await?;

Expand Down
Loading