Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use async_trait::async_trait;
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
use datafusion_common::stats::Precision;
use datafusion_common::{
Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err,
};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_groups::FileGroup;
Expand All @@ -38,7 +38,10 @@ use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion_expr::{
Expr, ProjectionExprs, TableProviderFilterPushDown, TableType,
projection_exprs_from_schema_and_indices,
};
use datafusion_physical_expr::create_lex_ordering;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
Expand Down Expand Up @@ -384,8 +387,11 @@ impl TableProvider for ListingTable {
filters: &[Expr],
limit: Option<usize>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
let projection_exprs = projection
.map(|p| projection_exprs_from_schema_and_indices(&self.schema(), p))
.transpose()?;
let options = ScanArgs::default()
.with_projection(projection.map(|p| p.as_slice()))
.with_projection(projection_exprs.as_deref())
.with_filters(Some(filters))
.with_limit(limit);
Ok(self.scan_with_args(state, options).await?.into_inner())
Expand Down Expand Up @@ -434,8 +440,7 @@ impl TableProvider for ListingTable {

// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
return Ok(ScanResult::new(Arc::new(EmptyExec::new(self.schema()))));
}

let output_ordering = self.try_create_output_ordering(state.execution_props())?;
Expand Down Expand Up @@ -478,6 +483,12 @@ impl TableProvider for ListingTable {

let file_source = self.create_file_source();

// Convert projection expressions to indices for FileScanConfigBuilder
let projection_indices = projection
.as_ref()
.map(|p| p.projection_column_indices(&self.schema()))
.transpose()?;

// create the execution plan
let plan = self
.options
Expand All @@ -488,7 +499,7 @@ impl TableProvider for ListingTable {
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
.with_projection_indices(projection)?
.with_projection_indices(projection_indices)?
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_expr_adapter(self.expr_adapter_factory.clone())
Expand Down
23 changes: 19 additions & 4 deletions datafusion/catalog/src/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::error::Result;
use datafusion_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableType};
use datafusion_common::{Column, error::Result};
use datafusion_expr::{
Expr, LogicalPlan, ProjectionExprs, TableProviderFilterPushDown, TableType,
};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::work_table::WorkTableExec;
use itertools::Itertools;

use crate::{ScanArgs, ScanResult, Session, TableProvider};

Expand Down Expand Up @@ -88,8 +91,17 @@ impl TableProvider for CteWorkTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema = self.schema();
let projection_exprs = projection.as_ref().map(|p| {
p.as_slice()
.iter()
.map(|i| {
Expr::Column(Column::from_name(schema.field(*i).name().to_string()))
})
.collect_vec()
});
let options = ScanArgs::default()
.with_projection(projection.map(|p| p.as_slice()))
.with_projection(projection_exprs.as_ref().map(|p| p.as_slice()))
.with_filters(Some(filters))
.with_limit(limit);
Ok(self.scan_with_args(state, options).await?.into_inner())
Expand All @@ -100,10 +112,13 @@ impl TableProvider for CteWorkTable {
_state: &dyn Session,
args: ScanArgs<'a>,
) -> Result<ScanResult> {
let schema = self.schema();
Ok(ScanResult::new(Arc::new(WorkTableExec::new(
self.name.clone(),
Arc::clone(&self.table_schema),
args.projection().map(|p| p.to_vec()),
args.projection()
.map(|p| p.projection_column_indices(&schema))
.transpose()?,
)?)))
}

Expand Down
49 changes: 40 additions & 9 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{Constraints, Statistics, not_impl_err};
use datafusion_expr::Expr;
use datafusion_expr::{Expr, ProjectionExprs};

use datafusion_expr::dml::InsertOp;
use datafusion_expr::{
Expand Down Expand Up @@ -194,7 +194,10 @@ pub trait TableProvider: Debug + Sync + Send {
args: ScanArgs<'a>,
) -> Result<ScanResult> {
let filters = args.filters().unwrap_or(&[]);
let projection = args.projection().map(|p| p.to_vec());
let projection = args
.projection()
.map(|p| p.projection_column_indices(&self.schema()))
.transpose()?;
let limit = args.limit();
let plan = self
.scan(state, projection.as_ref(), filters, limit)
Expand Down Expand Up @@ -359,19 +362,19 @@ pub trait TableProvider: Debug + Sync + Send {
#[derive(Debug, Clone, Default)]
pub struct ScanArgs<'a> {
filters: Option<&'a [Expr]>,
projection: Option<&'a [usize]>,
projection: Option<&'a [Expr]>,
limit: Option<usize>,
}

impl<'a> ScanArgs<'a> {
/// Set the column projection for the scan.
///
/// The projection is a list of column indices from [`TableProvider::schema`]
/// that should be included in the scan results. If `None`, all columns are included.
/// The projection is a list of expressions applied to
/// [`TableProvider::schema`] which may include columns and more complex expressions.
///
/// # Arguments
/// * `projection` - Optional slice of column indices to project
pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
/// * `projection` - Optional slice of projection expressions
pub fn with_projection(mut self, projection: Option<&'a [Expr]>) -> Self {
self.projection = projection;
self
}
Expand All @@ -380,7 +383,7 @@ impl<'a> ScanArgs<'a> {
///
/// Returns a reference to the projection column indices, or `None` if
/// no projection was specified (meaning all columns should be included).
pub fn projection(&self) -> Option<&'a [usize]> {
pub fn projection(&self) -> Option<&'a [Expr]> {
self.projection
}

Expand Down Expand Up @@ -429,6 +432,14 @@ impl<'a> ScanArgs<'a> {
pub struct ScanResult {
/// The ExecutionPlan to run.
plan: Arc<dyn ExecutionPlan>,
/// Filters that we re not completely handled by the scan
/// either via statistics or other plan-time optimizations,
/// or by binding them to the returned plan.
unhandled_filters: Vec<Expr>,
/// Projections that were not completely handled by the scan
/// either via statistics or other plan-time optimizations,
/// or by binding them to the returned plan.
unhandled_projections: Vec<usize>,
}

impl ScanResult {
Expand All @@ -437,7 +448,11 @@ impl ScanResult {
/// # Arguments
/// * `plan` - The execution plan that will perform the table scan
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self { plan }
Self {
plan,
unhandled_filters: vec![],
unhandled_projections: vec![],
}
}

/// Get a reference to the execution plan for this scan result.
Expand All @@ -455,6 +470,22 @@ impl ScanResult {
pub fn into_inner(self) -> Arc<dyn ExecutionPlan> {
self.plan
}

/// Record filters that were not handled by the scan.
/// If called multiple times this method appends the provided
/// filters to the existing unhandled filters in the [`ScanResult`].
pub fn with_unhandled_filters(mut self, filters: Vec<Expr>) -> Self {
self.unhandled_filters.extend(filters);
self
}

/// Record projections that were not handled by the scan.
/// If called multiple times this method appends the provided
/// projections to the existing unhandled projections in the [`ScanResult`].
pub fn with_unhandled_projections(mut self, projections: Vec<usize>) -> Self {
self.unhandled_projections.extend(projections);
self
}
}

impl From<Arc<dyn ExecutionPlan>> for ScanResult {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use crate::execution::RecordBatchStream;
pub fn scan_empty(
name: Option<&str>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
projection: Option<Vec<Expr>>,
) -> Result<LogicalPlanBuilder> {
let table_schema = Arc::new(table_schema.clone());
let provider = Arc::new(EmptyTable::new(table_schema));
Expand All @@ -79,7 +79,7 @@ pub fn scan_empty(
pub fn scan_empty_with_partitions(
name: Option<&str>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
projection: Option<Vec<Expr>>,
partitions: usize,
) -> Result<LogicalPlanBuilder> {
let table_schema = Arc::new(table_schema.clone());
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/execution/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion_execution::TaskContext;
use datafusion_expr::expr::{AggregateFunction, AggregateFunctionParams};
use datafusion_expr::logical_plan::{LogicalPlan, Values};
use datafusion_expr::{
Aggregate, AggregateUDF, EmptyRelation, Expr, LogicalPlanBuilder, UNNAMED_TABLE,
Aggregate, AggregateUDF, EmptyRelation, Expr, LogicalPlanBuilder, UNNAMED_TABLE, col,
};
use datafusion_functions_aggregate::count::Count;
use datafusion_physical_plan::collect;
Expand Down Expand Up @@ -110,7 +110,7 @@ fn inline_scan_projection_test() -> Result<()> {
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let projection = vec![schema.index_of(column)?];
let projection = vec![col(column)];

let provider = ViewTable::new(
LogicalPlan::EmptyRelation(EmptyRelation {
Expand Down
Loading
Loading