diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs index 0c41fc8e9ef2..b8c4d6d6f0d7 100644 --- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::{ - coalesce_partitions_exec, global_limit_exec, local_limit_exec, sort_exec, - sort_preserving_merge_exec, stream_exec, + coalesce_partitions_exec, global_limit_exec, hash_join_exec, local_limit_exec, + sort_exec, sort_preserving_merge_exec, stream_exec, }; use arrow::compute::SortOptions; @@ -29,6 +29,7 @@ use datafusion_common::error::Result; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::Partitioning; use datafusion_physical_expr::expressions::{BinaryExpr, col, lit}; +use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::limit_pushdown::LimitPushdown; @@ -161,6 +162,168 @@ fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li Ok(()) } +fn join_on_columns( + left_col: &str, + right_col: &str, +) -> Vec<(PhysicalExprRef, PhysicalExprRef)> { + vec![( + Arc::new(datafusion_physical_expr::expressions::Column::new( + left_col, 0, + )) as _, + Arc::new(datafusion_physical_expr::expressions::Column::new( + right_col, 0, + )) as _, + )] +} + +#[test] +fn absorbs_limit_into_hash_join_inner() -> Result<()> { + // HashJoinExec with Inner join should absorb limit via with_fetch + let schema = create_schema(); + let left = empty_exec(Arc::clone(&schema)); + let right = empty_exec(Arc::clone(&schema)); + let on = join_on_columns("c1", "c1"); + let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?; + let global_limit = global_limit_exec(hash_join, 0, Some(5)); + + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=0, fetch=5 + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)] + EmptyExec + EmptyExec + " + ); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + let optimized = format_plan(&after_optimize); + // The limit should be absorbed by the hash join (not pushed to children) + insta::assert_snapshot!( + optimized, + @r" + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=5 + EmptyExec + EmptyExec + " + ); + + Ok(()) +} + +#[test] +fn absorbs_limit_into_hash_join_right() -> Result<()> { + // HashJoinExec with Right join should absorb limit via with_fetch + let schema = create_schema(); + let left = empty_exec(Arc::clone(&schema)); + let right = empty_exec(Arc::clone(&schema)); + let on = join_on_columns("c1", "c1"); + let hash_join = hash_join_exec(left, right, on, None, &JoinType::Right)?; + let global_limit = global_limit_exec(hash_join, 0, Some(10)); + + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=0, fetch=10 + HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)] + EmptyExec + EmptyExec + " + ); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + let optimized = format_plan(&after_optimize); + // The limit should be absorbed by the hash join + insta::assert_snapshot!( + optimized, + @r" + HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)], fetch=10 + EmptyExec + EmptyExec + " + ); + + Ok(()) +} + +#[test] +fn absorbs_limit_into_hash_join_left() -> Result<()> { + // during probing, then unmatched rows at the end, stopping when limit is reached + let schema = create_schema(); + let left = empty_exec(Arc::clone(&schema)); + let right = empty_exec(Arc::clone(&schema)); + let on = join_on_columns("c1", "c1"); + let hash_join = hash_join_exec(left, right, on, None, &JoinType::Left)?; + let global_limit = global_limit_exec(hash_join, 0, Some(5)); + + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=0, fetch=5 + HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)] + EmptyExec + EmptyExec + " + ); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + let optimized = format_plan(&after_optimize); + // Left join now absorbs the limit + insta::assert_snapshot!( + optimized, + @r" + HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)], fetch=5 + EmptyExec + EmptyExec + " + ); + + Ok(()) +} + +#[test] +fn absorbs_limit_with_skip_into_hash_join() -> Result<()> { + let schema = create_schema(); + let left = empty_exec(Arc::clone(&schema)); + let right = empty_exec(Arc::clone(&schema)); + let on = join_on_columns("c1", "c1"); + let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?; + let global_limit = global_limit_exec(hash_join, 3, Some(5)); + + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=3, fetch=5 + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)] + EmptyExec + EmptyExec + " + ); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + let optimized = format_plan(&after_optimize); + // With skip, GlobalLimit is kept but fetch (skip + limit = 8) is absorbed by the join + insta::assert_snapshot!( + optimized, + @r" + GlobalLimitExec: skip=3, fetch=5 + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=8 + EmptyExec + EmptyExec + " + ); + + Ok(()) +} + #[test] fn pushes_global_limit_exec_through_projection_exec() -> Result<()> { let schema = create_schema(); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a330ad54cb33..3523ea739f31 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -258,6 +258,11 @@ pub struct HashJoinExecBuilder { partition_mode: PartitionMode, null_equality: NullEquality, null_aware: bool, + /// Maximum number of rows to return + /// + /// If the operator produces `< fetch` rows, it returns all available rows. + /// If it produces `>= fetch` rows, it returns exactly `fetch` rows and stops early. + fetch: Option, } impl HashJoinExecBuilder { @@ -278,6 +283,7 @@ impl HashJoinExecBuilder { join_type, null_equality: NullEquality::NullEqualsNothing, null_aware: false, + fetch: None, } } @@ -316,6 +322,12 @@ impl HashJoinExecBuilder { self } + /// Set fetch limit. + pub fn with_fetch(mut self, fetch: Option) -> Self { + self.fetch = fetch; + self + } + /// Build resulting execution plan. pub fn build(self) -> Result { let Self { @@ -328,6 +340,7 @@ impl HashJoinExecBuilder { partition_mode, null_equality, null_aware, + fetch, } = self; let left_schema = left.schema(); @@ -393,6 +406,7 @@ impl HashJoinExecBuilder { null_aware, cache, dynamic_filter: None, + fetch, }) } } @@ -409,6 +423,7 @@ impl From<&HashJoinExec> for HashJoinExecBuilder { partition_mode: exec.mode, null_equality: exec.null_equality, null_aware: exec.null_aware, + fetch: exec.fetch, } } } @@ -646,6 +661,8 @@ pub struct HashJoinExec { /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result. /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates. dynamic_filter: Option, + /// Maximum number of rows to return + fetch: Option, } #[derive(Clone)] @@ -910,25 +927,27 @@ impl HashJoinExec { ) -> Result> { let left = self.left(); let right = self.right(); - let new_join = HashJoinExec::try_new( + let new_join = HashJoinExecBuilder::new( Arc::clone(right), Arc::clone(left), self.on() .iter() .map(|(l, r)| (Arc::clone(r), Arc::clone(l))) .collect(), - self.filter().map(JoinFilter::swap), - &self.join_type().swap(), - swap_join_projection( - left.schema().fields().len(), - right.schema().fields().len(), - self.projection.as_deref(), - self.join_type(), - ), - partition_mode, - self.null_equality(), - self.null_aware, - )?; + self.join_type().swap(), + ) + .with_filter(self.filter().map(JoinFilter::swap)) + .with_projection(swap_join_projection( + left.schema().fields().len(), + right.schema().fields().len(), + self.projection.as_deref(), + self.join_type(), + )) + .with_partition_mode(partition_mode) + .with_null_equality(self.null_equality()) + .with_null_aware(self.null_aware) + .with_fetch(self.fetch) + .build()?; // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again if matches!( self.join_type(), @@ -979,6 +998,9 @@ impl DisplayAs for HashJoinExec { } else { "" }; + let display_fetch = self + .fetch + .map_or_else(String::new, |f| format!(", fetch={f}")); let on = self .on .iter() @@ -987,13 +1009,14 @@ impl DisplayAs for HashJoinExec { .join(", "); write!( f, - "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}", + "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}", self.mode, self.join_type, on, display_filter, display_projections, display_null_equality, + display_fetch, ) } DisplayFormatType::TreeRender => { @@ -1020,6 +1043,10 @@ impl DisplayAs for HashJoinExec { writeln!(f, "filter={filter}")?; } + if let Some(fetch) = self.fetch { + writeln!(f, "fetch={fetch}")?; + } + Ok(()) } } @@ -1122,6 +1149,7 @@ impl ExecutionPlan for HashJoinExec { )?, // Keep the dynamic filter, bounds accumulator will be reset dynamic_filter: self.dynamic_filter.clone(), + fetch: self.fetch, })) } @@ -1145,6 +1173,7 @@ impl ExecutionPlan for HashJoinExec { cache: self.cache.clone(), // Reset dynamic filter and bounds accumulator to initial state dynamic_filter: None, + fetch: self.fetch, })) } @@ -1312,6 +1341,7 @@ impl ExecutionPlan for HashJoinExec { build_accumulator, self.mode, self.null_aware, + self.fetch, ))) } @@ -1338,7 +1368,9 @@ impl ExecutionPlan for HashJoinExec { &self.join_schema, )?; // Project statistics if there is a projection - Ok(stats.project(self.projection.as_ref())) + let stats = stats.project(self.projection.as_ref()); + // Apply fetch limit to statistics + stats.with_fetch(self.fetch, 0, 1) } /// Tries to push `projection` down through `hash_join`. If possible, performs the @@ -1367,18 +1399,22 @@ impl ExecutionPlan for HashJoinExec { &schema, self.filter(), )? { - Ok(Some(Arc::new(HashJoinExec::try_new( - Arc::new(projected_left_child), - Arc::new(projected_right_child), - join_on, - join_filter, - self.join_type(), + Ok(Some(Arc::new( + HashJoinExecBuilder::new( + Arc::new(projected_left_child), + Arc::new(projected_right_child), + join_on, + *self.join_type(), + ) + .with_filter(join_filter) // Returned early if projection is not None - None, - *self.partition_mode(), - self.null_equality, - self.null_aware, - )?))) + .with_projection(None) + .with_partition_mode(*self.partition_mode()) + .with_null_equality(self.null_equality) + .with_null_aware(self.null_aware) + .with_fetch(self.fetch) + .build()?, + ))) } else { try_embed_projection(projection, self) } @@ -1476,12 +1512,32 @@ impl ExecutionPlan for HashJoinExec { filter: dynamic_filter, build_accumulator: OnceLock::new(), }), + fetch: self.fetch, }); result = result.with_updated_node(new_node as Arc); } } Ok(result) } + + fn supports_limit_pushdown(&self) -> bool { + // Hash join execution plan does not support pushing limit down through to children + // because the children don't know about the join condition and can't + // determine how many rows to produce + false + } + + fn fetch(&self) -> Option { + self.fetch + } + + fn with_fetch(&self, limit: Option) -> Option> { + HashJoinExecBuilder::from(self) + .with_fetch(limit) + .build() + .ok() + .map(|exec| Arc::new(exec) as _) + } } /// Accumulator for collecting min/max bounds from build-side data during hash join. diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 54e620f99de7..8af26c1b8a05 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use std::sync::atomic::Ordering; use std::task::Poll; +use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; use crate::joins::Map; use crate::joins::MapOffset; use crate::joins::PartitionMode; @@ -46,7 +47,6 @@ use crate::{ }; use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array}; -use arrow::compute::BatchCoalescer; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::{ @@ -221,10 +221,9 @@ pub(super) struct HashJoinStream { build_waiter: Option>, /// Partitioning mode to use mode: PartitionMode, - /// Output buffer for coalescing small batches into larger ones. - /// Uses `BatchCoalescer` from arrow to efficiently combine batches. - /// When batches are already close to target size, they bypass coalescing. - output_buffer: Box, + /// Output buffer for coalescing small batches into larger ones with optional fetch limit. + /// Uses `LimitedBatchCoalescer` to efficiently combine batches and absorb limit with 'fetch' + output_buffer: LimitedBatchCoalescer, /// Whether this is a null-aware anti join null_aware: bool, } @@ -375,14 +374,11 @@ impl HashJoinStream { build_accumulator: Option>, mode: PartitionMode, null_aware: bool, + fetch: Option, ) -> Self { - // Create output buffer with coalescing. - // Use biggest_coalesce_batch_size to bypass coalescing for batches - // that are already close to target size (within 50%). - let output_buffer = Box::new( - BatchCoalescer::new(Arc::clone(&schema), batch_size) - .with_biggest_coalesce_batch_size(Some(batch_size / 2)), - ); + // Create output buffer with coalescing and optional fetch limit. + let output_buffer = + LimitedBatchCoalescer::new(Arc::clone(&schema), batch_size, fetch); Self { partition, @@ -425,6 +421,11 @@ impl HashJoinStream { .record_poll(Poll::Ready(Some(Ok(batch)))); } + // Check if the coalescer has finished (limit reached and flushed) + if self.output_buffer.is_finished() { + return Poll::Ready(None); + } + return match self.state { HashJoinStreamState::WaitBuildSide => { handle_state!(ready!(self.collect_build_side(cx))) @@ -443,7 +444,7 @@ impl HashJoinStream { } HashJoinStreamState::Completed if !self.output_buffer.is_empty() => { // Flush any remaining buffered data - self.output_buffer.finish_buffered_batch()?; + self.output_buffer.finish()?; // Continue loop to emit the flushed batch continue; } @@ -782,10 +783,17 @@ impl HashJoinStream { join_side, )?; - self.output_buffer.push_batch(batch)?; + let push_status = self.output_buffer.push_batch(batch)?; timer.done(); + // If limit reached, finish and move to Completed state + if push_status == PushBatchStatus::LimitReached { + self.output_buffer.finish()?; + self.state = HashJoinStreamState::Completed; + return Ok(StatefulStreamResult::Continue); + } + if next_offset.is_none() { self.state = HashJoinStreamState::FetchProbeBatch; } else { @@ -892,7 +900,12 @@ impl HashJoinStream { &self.column_indices, JoinSide::Left, )?; - self.output_buffer.push_batch(batch)?; + let push_status = self.output_buffer.push_batch(batch)?; + + // If limit reached, finish the coalescer + if push_status == PushBatchStatus::LimitReached { + self.output_buffer.finish()?; + } } Ok(StatefulStreamResult::Continue) diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt index 1b25a01d6210..59f3d8285af4 100644 --- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt @@ -55,7 +55,7 @@ logical_plan 07)--------TableScan: annotated_data projection=[a, c] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST], fetch=5 -02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)], projection=[a@1] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)], projection=[a@1], fetch=5 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], file_type=csv, has_header=true 04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], file_type=csv, has_header=true @@ -96,7 +96,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a2@0 ASC NULLS LAST, b@1 ASC NULLS LAST], fetch=10 02)--ProjectionExec: expr=[a@0 as a2, b@1 as b] -03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1] +03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1], fetch=10 04)------CoalescePartitionsExec 05)--------FilterExec: d@1 = 3 06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 diff --git a/datafusion/sqllogictest/test_files/join_limit_pushdown.slt b/datafusion/sqllogictest/test_files/join_limit_pushdown.slt new file mode 100644 index 000000000000..6bb23c1b4c24 --- /dev/null +++ b/datafusion/sqllogictest/test_files/join_limit_pushdown.slt @@ -0,0 +1,269 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for limit pushdown into joins + +# need to use a single partition for deterministic results +statement ok +set datafusion.execution.target_partitions = 1; + +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +set datafusion.optimizer.prefer_hash_join = true; + +# Create test tables +statement ok +CREATE TABLE t1 (a INT, b VARCHAR) AS VALUES + (1, 'one'), + (2, 'two'), + (3, 'three'), + (4, 'four'), + (5, 'five'); + +statement ok +CREATE TABLE t2 (x INT, y VARCHAR) AS VALUES + (1, 'alpha'), + (2, 'beta'), + (3, 'gamma'), + (6, 'delta'), + (7, 'epsilon'); + +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Inner Join: t1.a = t2.x +03)----TableScan: t1 projection=[a] +04)----TableScan: t2 projection=[x] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)], fetch=2 +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +query II +SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 2; +---- +1 1 +2 2 + +# Right join is converted to Left join with projection - fetch pushdown is supported +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 RIGHT JOIN t2 ON t1.a = t2.x LIMIT 3; +---- +logical_plan +01)Limit: skip=0, fetch=3 +02)--Right Join: t1.a = t2.x +03)----TableScan: t1 projection=[a] +04)----Limit: skip=0, fetch=3 +05)------TableScan: t2 projection=[x], fetch=3 +physical_plan +01)ProjectionExec: expr=[a@1 as a, x@0 as x] +02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(x@0, a@0)], fetch=3 +03)----DataSourceExec: partitions=1, partition_sizes=[1], fetch=3 +04)----DataSourceExec: partitions=1, partition_sizes=[1] + +query II +SELECT t1.a, t2.x FROM t1 RIGHT JOIN t2 ON t1.a = t2.x LIMIT 3; +---- +1 1 +2 2 +3 3 + +# Left join supports fetch pushdown +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 LEFT JOIN t2 ON t1.a = t2.x LIMIT 3; +---- +logical_plan +01)Limit: skip=0, fetch=3 +02)--Left Join: t1.a = t2.x +03)----Limit: skip=0, fetch=3 +04)------TableScan: t1 projection=[a], fetch=3 +05)----TableScan: t2 projection=[x] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, x@0)], fetch=3 +02)--DataSourceExec: partitions=1, partition_sizes=[1], fetch=3 +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +query II +SELECT t1.a, t2.x FROM t1 LEFT JOIN t2 ON t1.a = t2.x LIMIT 3; +---- +1 1 +2 2 +3 3 + + +# Full join supports fetch pushdown +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 FULL OUTER JOIN t2 ON t1.a = t2.x LIMIT 4; +---- +logical_plan +01)Limit: skip=0, fetch=4 +02)--Full Join: t1.a = t2.x +03)----TableScan: t1 projection=[a] +04)----TableScan: t2 projection=[x] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, x@0)], fetch=4 +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Note: FULL OUTER JOIN order is not deterministic, so we just check count +query I +SELECT COUNT(*) FROM (SELECT t1.a, t2.x FROM t1 FULL OUTER JOIN t2 ON t1.a = t2.x LIMIT 4); +---- +4 + +# EXISTS becomes left semi join - fetch pushdown is supported +query TT +EXPLAIN SELECT t2.x FROM t2 WHERE EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x) LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--LeftSemi Join: t2.x = __correlated_sq_1.a +03)----TableScan: t2 projection=[x] +04)----SubqueryAlias: __correlated_sq_1 +05)------TableScan: t1 projection=[a] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(x@0, a@0)], fetch=2 +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +query I +SELECT t2.x FROM t2 WHERE EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x) LIMIT 2; +---- +1 +2 + +# NOT EXISTS becomes LeftAnti - fetch pushdown is supported +query TT +EXPLAIN SELECT t2.x FROM t2 WHERE NOT EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x) LIMIT 1; +---- +logical_plan +01)Limit: skip=0, fetch=1 +02)--LeftAnti Join: t2.x = __correlated_sq_1.a +03)----TableScan: t2 projection=[x] +04)----SubqueryAlias: __correlated_sq_1 +05)------TableScan: t1 projection=[a] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(x@0, a@0)], fetch=1 +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +query I +SELECT t2.x FROM t2 WHERE NOT EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x) LIMIT 1; +---- +6 + +# Inner join should push +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 1 OFFSET 1; +---- +logical_plan +01)Limit: skip=1, fetch=1 +02)--Inner Join: t1.a = t2.x +03)----TableScan: t1 projection=[a] +04)----TableScan: t2 projection=[x] +physical_plan +01)GlobalLimitExec: skip=1, fetch=1 +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)], fetch=2 +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----DataSourceExec: partitions=1, partition_sizes=[1] + +query II +SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 1 OFFSET 1; +---- +2 2 + +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 0; +---- +logical_plan EmptyRelation: rows=0 +physical_plan EmptyExec + +query II +SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 0; +---- + +statement ok +CREATE TABLE t3 (p INT, q VARCHAR) AS VALUES + (1, 'foo'), + (2, 'bar'), + (3, 'baz'); + +query TT +EXPLAIN SELECT t1.a, t2.x, t3.p +FROM t1 +INNER JOIN t2 ON t1.a = t2.x +INNER JOIN t3 ON t2.x = t3.p +LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Inner Join: t2.x = t3.p +03)----Inner Join: t1.a = t2.x +04)------TableScan: t1 projection=[a] +05)------TableScan: t2 projection=[x] +06)----TableScan: t3 projection=[p] +physical_plan +01)ProjectionExec: expr=[a@1 as a, x@2 as x, p@0 as p] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p@0, x@1)], fetch=2 +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)] +05)------DataSourceExec: partitions=1, partition_sizes=[1] +06)------DataSourceExec: partitions=1, partition_sizes=[1] + +query III +SELECT t1.a, t2.x, t3.p +FROM t1 +INNER JOIN t2 ON t1.a = t2.x +INNER JOIN t3 ON t2.x = t3.p +LIMIT 2; +---- +1 1 1 +2 2 2 + +# Try larger limit +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 100; +---- +logical_plan +01)Limit: skip=0, fetch=100 +02)--Inner Join: t1.a = t2.x +03)----TableScan: t1 projection=[a] +04)----TableScan: t2 projection=[x] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)], fetch=100 +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +query II +SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 100; +---- +1 1 +2 2 +3 3 + +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2; + +statement ok +DROP TABLE t3; diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index dd7f4710d9db..2d7075e251c4 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4161,10 +4161,9 @@ logical_plan 03)----TableScan: t0 projection=[c1, c2] 04)----TableScan: t1 projection=[c1, c2, c3] physical_plan -01)GlobalLimitExec: skip=0, fetch=2 -02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)] -03)----DataSourceExec: partitions=1, partition_sizes=[2] -04)----DataSourceExec: partitions=1, partition_sizes=[2] +01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], fetch=2 +02)--DataSourceExec: partitions=1, partition_sizes=[2] +03)--DataSourceExec: partitions=1, partition_sizes=[2] ## Test join.on.is_empty() && join.filter.is_some() -> single filter now a PWMJ query TT @@ -4191,10 +4190,9 @@ logical_plan 03)----TableScan: t0 projection=[c1, c2] 04)----TableScan: t1 projection=[c1, c2, c3] physical_plan -01)GlobalLimitExec: skip=0, fetch=2 -02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1 -03)----DataSourceExec: partitions=1, partition_sizes=[2] -04)----DataSourceExec: partitions=1, partition_sizes=[2] +01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1, fetch=2 +02)--DataSourceExec: partitions=1, partition_sizes=[2] +03)--DataSourceExec: partitions=1, partition_sizes=[2] ## Add more test cases for join limit pushdown statement ok @@ -4245,6 +4243,7 @@ select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2; 1 1 # can only push down to t1 (preserved side) +# limit pushdown supported for left join - both to join and probe side query TT explain select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2; ---- @@ -4255,10 +4254,9 @@ logical_plan 04)------TableScan: t1 projection=[a], fetch=2 05)----TableScan: t2 projection=[b] physical_plan -01)GlobalLimitExec: skip=0, fetch=2 -02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], limit=2, file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], file_type=csv, has_header=true +01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)], fetch=2 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], limit=2, file_type=csv, has_header=true +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], file_type=csv, has_header=true ###### ## RIGHT JOIN w/ LIMIT @@ -4289,10 +4287,9 @@ logical_plan 04)----Limit: skip=0, fetch=2 05)------TableScan: t2 projection=[b], fetch=2 physical_plan -01)GlobalLimitExec: skip=0, fetch=2 -02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], limit=2, file_type=csv, has_header=true +01)HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)], fetch=2 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], file_type=csv, has_header=true +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], limit=2, file_type=csv, has_header=true ###### ## FULL JOIN w/ LIMIT @@ -4316,7 +4313,7 @@ select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2; 4 4 -# can't push limit for full outer join +# full outer join supports fetch pushdown query TT explain select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2; ---- @@ -4326,10 +4323,9 @@ logical_plan 03)----TableScan: t1 projection=[a] 04)----TableScan: t2 projection=[b] physical_plan -01)GlobalLimitExec: skip=0, fetch=2 -02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], file_type=csv, has_header=true +01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)], fetch=2 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], file_type=csv, has_header=true +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], file_type=csv, has_header=true statement ok drop table t1;