From 832a79c22548ff273e375910fae9ce965e5929fb Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Sun, 8 Feb 2026 22:53:30 -0600 Subject: [PATCH 1/5] feat: Pushdown limit into hash join --- .../physical_optimizer/limit_pushdown.rs | 167 ++++++++++- .../physical-plan/src/joins/hash_join/exec.rs | 73 ++++- .../src/joins/hash_join/stream.rs | 43 ++- .../join_disable_repartition_joins.slt | 4 +- .../test_files/join_limit_pushdown.slt | 269 ++++++++++++++++++ datafusion/sqllogictest/test_files/joins.slt | 38 ++- 6 files changed, 553 insertions(+), 41 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/join_limit_pushdown.slt diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs index 0c41fc8e9ef21..b8c4d6d6f0d7a 100644 --- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::{ - coalesce_partitions_exec, global_limit_exec, local_limit_exec, sort_exec, - sort_preserving_merge_exec, stream_exec, + coalesce_partitions_exec, global_limit_exec, hash_join_exec, local_limit_exec, + sort_exec, sort_preserving_merge_exec, stream_exec, }; use arrow::compute::SortOptions; @@ -29,6 +29,7 @@ use datafusion_common::error::Result; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::Partitioning; use datafusion_physical_expr::expressions::{BinaryExpr, col, lit}; +use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::limit_pushdown::LimitPushdown; @@ -161,6 +162,168 @@ fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li Ok(()) } +fn join_on_columns( + left_col: &str, + right_col: &str, +) -> Vec<(PhysicalExprRef, PhysicalExprRef)> { + vec![( + Arc::new(datafusion_physical_expr::expressions::Column::new( + left_col, 0, + )) as _, + Arc::new(datafusion_physical_expr::expressions::Column::new( + right_col, 0, + )) as _, + )] +} + +#[test] +fn absorbs_limit_into_hash_join_inner() -> Result<()> { + // HashJoinExec with Inner join should absorb limit via with_fetch + let schema = create_schema(); + let left = empty_exec(Arc::clone(&schema)); + let right = empty_exec(Arc::clone(&schema)); + let on = join_on_columns("c1", "c1"); + let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?; + let global_limit = global_limit_exec(hash_join, 0, Some(5)); + + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=0, fetch=5 + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)] + EmptyExec + EmptyExec + " + ); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + let optimized = format_plan(&after_optimize); + // The limit should be absorbed by the hash join (not pushed to children) + insta::assert_snapshot!( + optimized, + @r" + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=5 + EmptyExec + EmptyExec + " + ); + + Ok(()) +} + +#[test] +fn absorbs_limit_into_hash_join_right() -> Result<()> { + // HashJoinExec with Right join should absorb limit via with_fetch + let schema = create_schema(); + let left = empty_exec(Arc::clone(&schema)); + let right = empty_exec(Arc::clone(&schema)); + let on = join_on_columns("c1", "c1"); + let hash_join = hash_join_exec(left, right, on, None, &JoinType::Right)?; + let global_limit = global_limit_exec(hash_join, 0, Some(10)); + + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=0, fetch=10 + HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)] + EmptyExec + EmptyExec + " + ); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + let optimized = format_plan(&after_optimize); + // The limit should be absorbed by the hash join + insta::assert_snapshot!( + optimized, + @r" + HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)], fetch=10 + EmptyExec + EmptyExec + " + ); + + Ok(()) +} + +#[test] +fn absorbs_limit_into_hash_join_left() -> Result<()> { + // during probing, then unmatched rows at the end, stopping when limit is reached + let schema = create_schema(); + let left = empty_exec(Arc::clone(&schema)); + let right = empty_exec(Arc::clone(&schema)); + let on = join_on_columns("c1", "c1"); + let hash_join = hash_join_exec(left, right, on, None, &JoinType::Left)?; + let global_limit = global_limit_exec(hash_join, 0, Some(5)); + + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=0, fetch=5 + HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)] + EmptyExec + EmptyExec + " + ); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + let optimized = format_plan(&after_optimize); + // Left join now absorbs the limit + insta::assert_snapshot!( + optimized, + @r" + HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)], fetch=5 + EmptyExec + EmptyExec + " + ); + + Ok(()) +} + +#[test] +fn absorbs_limit_with_skip_into_hash_join() -> Result<()> { + let schema = create_schema(); + let left = empty_exec(Arc::clone(&schema)); + let right = empty_exec(Arc::clone(&schema)); + let on = join_on_columns("c1", "c1"); + let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?; + let global_limit = global_limit_exec(hash_join, 3, Some(5)); + + let initial = format_plan(&global_limit); + insta::assert_snapshot!( + initial, + @r" + GlobalLimitExec: skip=3, fetch=5 + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)] + EmptyExec + EmptyExec + " + ); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + let optimized = format_plan(&after_optimize); + // With skip, GlobalLimit is kept but fetch (skip + limit = 8) is absorbed by the join + insta::assert_snapshot!( + optimized, + @r" + GlobalLimitExec: skip=3, fetch=5 + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=8 + EmptyExec + EmptyExec + " + ); + + Ok(()) +} + #[test] fn pushes_global_limit_exec_through_projection_exec() -> Result<()> { let schema = create_schema(); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a330ad54cb338..33bee98a64f85 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -258,6 +258,8 @@ pub struct HashJoinExecBuilder { partition_mode: PartitionMode, null_equality: NullEquality, null_aware: bool, + /// Maximum number of rows to return + fetch: Option, } impl HashJoinExecBuilder { @@ -278,6 +280,7 @@ impl HashJoinExecBuilder { join_type, null_equality: NullEquality::NullEqualsNothing, null_aware: false, + fetch: None, } } @@ -316,6 +319,12 @@ impl HashJoinExecBuilder { self } + /// Set fetch limit. + pub fn with_fetch(mut self, fetch: Option) -> Self { + self.fetch = fetch; + self + } + /// Build resulting execution plan. pub fn build(self) -> Result { let Self { @@ -328,6 +337,7 @@ impl HashJoinExecBuilder { partition_mode, null_equality, null_aware, + fetch, } = self; let left_schema = left.schema(); @@ -393,6 +403,7 @@ impl HashJoinExecBuilder { null_aware, cache, dynamic_filter: None, + fetch, }) } } @@ -409,6 +420,7 @@ impl From<&HashJoinExec> for HashJoinExecBuilder { partition_mode: exec.mode, null_equality: exec.null_equality, null_aware: exec.null_aware, + fetch: exec.fetch, } } } @@ -646,6 +658,8 @@ pub struct HashJoinExec { /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result. /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates. dynamic_filter: Option, + /// Maximum number of rows to return + fetch: Option, } #[derive(Clone)] @@ -760,6 +774,11 @@ impl HashJoinExec { self.null_equality } + /// Get the fetch (limit) for this join + pub fn fetch(&self) -> Option { + self.fetch + } + /// Get the dynamic filter expression for testing purposes. /// Returns `None` if no dynamic filter has been set. /// @@ -979,6 +998,9 @@ impl DisplayAs for HashJoinExec { } else { "" }; + let display_fetch = self + .fetch + .map_or_else(String::new, |f| format!(", fetch={f}")); let on = self .on .iter() @@ -987,13 +1009,14 @@ impl DisplayAs for HashJoinExec { .join(", "); write!( f, - "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}", + "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}", self.mode, self.join_type, on, display_filter, display_projections, display_null_equality, + display_fetch, ) } DisplayFormatType::TreeRender => { @@ -1020,6 +1043,10 @@ impl DisplayAs for HashJoinExec { writeln!(f, "filter={filter}")?; } + if let Some(fetch) = self.fetch { + writeln!(f, "fetch={fetch}")?; + } + Ok(()) } } @@ -1122,6 +1149,7 @@ impl ExecutionPlan for HashJoinExec { )?, // Keep the dynamic filter, bounds accumulator will be reset dynamic_filter: self.dynamic_filter.clone(), + fetch: self.fetch, })) } @@ -1145,6 +1173,7 @@ impl ExecutionPlan for HashJoinExec { cache: self.cache.clone(), // Reset dynamic filter and bounds accumulator to initial state dynamic_filter: None, + fetch: self.fetch, })) } @@ -1312,6 +1341,7 @@ impl ExecutionPlan for HashJoinExec { build_accumulator, self.mode, self.null_aware, + self.fetch, ))) } @@ -1476,12 +1506,53 @@ impl ExecutionPlan for HashJoinExec { filter: dynamic_filter, build_accumulator: OnceLock::new(), }), + fetch: self.fetch, }); result = result.with_updated_node(new_node as Arc); } } Ok(result) } + + fn supports_limit_pushdown(&self) -> bool { + // Hash join execution plan does not support pushing limit down through to children + // because the children don't know about the join condition and can't + // determine how many rows to produce + false + } + + fn fetch(&self) -> Option { + self.fetch + } + + fn with_fetch(&self, limit: Option) -> Option> { + // Null-aware anti join requires seeing ALL probe rows to check for NULLs. + // If any probe row has NULL, the output must be empty. + // We can't stop early or we might miss a NULL and return wrong results. + if self.null_aware { + return None; + } + + Some(Arc::new(HashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: Arc::clone(&self.left_fut), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + null_aware: self.null_aware, + cache: self.cache.clone(), + dynamic_filter: self.dynamic_filter.clone(), + fetch: limit, + })) + } } /// Accumulator for collecting min/max bounds from build-side data during hash join. diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 54e620f99de7a..8af26c1b8a055 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use std::sync::atomic::Ordering; use std::task::Poll; +use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; use crate::joins::Map; use crate::joins::MapOffset; use crate::joins::PartitionMode; @@ -46,7 +47,6 @@ use crate::{ }; use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array}; -use arrow::compute::BatchCoalescer; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::{ @@ -221,10 +221,9 @@ pub(super) struct HashJoinStream { build_waiter: Option>, /// Partitioning mode to use mode: PartitionMode, - /// Output buffer for coalescing small batches into larger ones. - /// Uses `BatchCoalescer` from arrow to efficiently combine batches. - /// When batches are already close to target size, they bypass coalescing. - output_buffer: Box, + /// Output buffer for coalescing small batches into larger ones with optional fetch limit. + /// Uses `LimitedBatchCoalescer` to efficiently combine batches and absorb limit with 'fetch' + output_buffer: LimitedBatchCoalescer, /// Whether this is a null-aware anti join null_aware: bool, } @@ -375,14 +374,11 @@ impl HashJoinStream { build_accumulator: Option>, mode: PartitionMode, null_aware: bool, + fetch: Option, ) -> Self { - // Create output buffer with coalescing. - // Use biggest_coalesce_batch_size to bypass coalescing for batches - // that are already close to target size (within 50%). - let output_buffer = Box::new( - BatchCoalescer::new(Arc::clone(&schema), batch_size) - .with_biggest_coalesce_batch_size(Some(batch_size / 2)), - ); + // Create output buffer with coalescing and optional fetch limit. + let output_buffer = + LimitedBatchCoalescer::new(Arc::clone(&schema), batch_size, fetch); Self { partition, @@ -425,6 +421,11 @@ impl HashJoinStream { .record_poll(Poll::Ready(Some(Ok(batch)))); } + // Check if the coalescer has finished (limit reached and flushed) + if self.output_buffer.is_finished() { + return Poll::Ready(None); + } + return match self.state { HashJoinStreamState::WaitBuildSide => { handle_state!(ready!(self.collect_build_side(cx))) @@ -443,7 +444,7 @@ impl HashJoinStream { } HashJoinStreamState::Completed if !self.output_buffer.is_empty() => { // Flush any remaining buffered data - self.output_buffer.finish_buffered_batch()?; + self.output_buffer.finish()?; // Continue loop to emit the flushed batch continue; } @@ -782,10 +783,17 @@ impl HashJoinStream { join_side, )?; - self.output_buffer.push_batch(batch)?; + let push_status = self.output_buffer.push_batch(batch)?; timer.done(); + // If limit reached, finish and move to Completed state + if push_status == PushBatchStatus::LimitReached { + self.output_buffer.finish()?; + self.state = HashJoinStreamState::Completed; + return Ok(StatefulStreamResult::Continue); + } + if next_offset.is_none() { self.state = HashJoinStreamState::FetchProbeBatch; } else { @@ -892,7 +900,12 @@ impl HashJoinStream { &self.column_indices, JoinSide::Left, )?; - self.output_buffer.push_batch(batch)?; + let push_status = self.output_buffer.push_batch(batch)?; + + // If limit reached, finish the coalescer + if push_status == PushBatchStatus::LimitReached { + self.output_buffer.finish()?; + } } Ok(StatefulStreamResult::Continue) diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt index 1b25a01d62104..59f3d8285af49 100644 --- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt @@ -55,7 +55,7 @@ logical_plan 07)--------TableScan: annotated_data projection=[a, c] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST], fetch=5 -02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)], projection=[a@1] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)], projection=[a@1], fetch=5 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], file_type=csv, has_header=true 04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], file_type=csv, has_header=true @@ -96,7 +96,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [a2@0 ASC NULLS LAST, b@1 ASC NULLS LAST], fetch=10 02)--ProjectionExec: expr=[a@0 as a2, b@1 as b] -03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1] +03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1], fetch=10 04)------CoalescePartitionsExec 05)--------FilterExec: d@1 = 3 06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 diff --git a/datafusion/sqllogictest/test_files/join_limit_pushdown.slt b/datafusion/sqllogictest/test_files/join_limit_pushdown.slt new file mode 100644 index 0000000000000..6bb23c1b4c243 --- /dev/null +++ b/datafusion/sqllogictest/test_files/join_limit_pushdown.slt @@ -0,0 +1,269 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for limit pushdown into joins + +# need to use a single partition for deterministic results +statement ok +set datafusion.execution.target_partitions = 1; + +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +set datafusion.optimizer.prefer_hash_join = true; + +# Create test tables +statement ok +CREATE TABLE t1 (a INT, b VARCHAR) AS VALUES + (1, 'one'), + (2, 'two'), + (3, 'three'), + (4, 'four'), + (5, 'five'); + +statement ok +CREATE TABLE t2 (x INT, y VARCHAR) AS VALUES + (1, 'alpha'), + (2, 'beta'), + (3, 'gamma'), + (6, 'delta'), + (7, 'epsilon'); + +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Inner Join: t1.a = t2.x +03)----TableScan: t1 projection=[a] +04)----TableScan: t2 projection=[x] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)], fetch=2 +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +query II +SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 2; +---- +1 1 +2 2 + +# Right join is converted to Left join with projection - fetch pushdown is supported +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 RIGHT JOIN t2 ON t1.a = t2.x LIMIT 3; +---- +logical_plan +01)Limit: skip=0, fetch=3 +02)--Right Join: t1.a = t2.x +03)----TableScan: t1 projection=[a] +04)----Limit: skip=0, fetch=3 +05)------TableScan: t2 projection=[x], fetch=3 +physical_plan +01)ProjectionExec: expr=[a@1 as a, x@0 as x] +02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(x@0, a@0)], fetch=3 +03)----DataSourceExec: partitions=1, partition_sizes=[1], fetch=3 +04)----DataSourceExec: partitions=1, partition_sizes=[1] + +query II +SELECT t1.a, t2.x FROM t1 RIGHT JOIN t2 ON t1.a = t2.x LIMIT 3; +---- +1 1 +2 2 +3 3 + +# Left join supports fetch pushdown +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 LEFT JOIN t2 ON t1.a = t2.x LIMIT 3; +---- +logical_plan +01)Limit: skip=0, fetch=3 +02)--Left Join: t1.a = t2.x +03)----Limit: skip=0, fetch=3 +04)------TableScan: t1 projection=[a], fetch=3 +05)----TableScan: t2 projection=[x] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, x@0)], fetch=3 +02)--DataSourceExec: partitions=1, partition_sizes=[1], fetch=3 +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +query II +SELECT t1.a, t2.x FROM t1 LEFT JOIN t2 ON t1.a = t2.x LIMIT 3; +---- +1 1 +2 2 +3 3 + + +# Full join supports fetch pushdown +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 FULL OUTER JOIN t2 ON t1.a = t2.x LIMIT 4; +---- +logical_plan +01)Limit: skip=0, fetch=4 +02)--Full Join: t1.a = t2.x +03)----TableScan: t1 projection=[a] +04)----TableScan: t2 projection=[x] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, x@0)], fetch=4 +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Note: FULL OUTER JOIN order is not deterministic, so we just check count +query I +SELECT COUNT(*) FROM (SELECT t1.a, t2.x FROM t1 FULL OUTER JOIN t2 ON t1.a = t2.x LIMIT 4); +---- +4 + +# EXISTS becomes left semi join - fetch pushdown is supported +query TT +EXPLAIN SELECT t2.x FROM t2 WHERE EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x) LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--LeftSemi Join: t2.x = __correlated_sq_1.a +03)----TableScan: t2 projection=[x] +04)----SubqueryAlias: __correlated_sq_1 +05)------TableScan: t1 projection=[a] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(x@0, a@0)], fetch=2 +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +query I +SELECT t2.x FROM t2 WHERE EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x) LIMIT 2; +---- +1 +2 + +# NOT EXISTS becomes LeftAnti - fetch pushdown is supported +query TT +EXPLAIN SELECT t2.x FROM t2 WHERE NOT EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x) LIMIT 1; +---- +logical_plan +01)Limit: skip=0, fetch=1 +02)--LeftAnti Join: t2.x = __correlated_sq_1.a +03)----TableScan: t2 projection=[x] +04)----SubqueryAlias: __correlated_sq_1 +05)------TableScan: t1 projection=[a] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(x@0, a@0)], fetch=1 +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +query I +SELECT t2.x FROM t2 WHERE NOT EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x) LIMIT 1; +---- +6 + +# Inner join should push +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 1 OFFSET 1; +---- +logical_plan +01)Limit: skip=1, fetch=1 +02)--Inner Join: t1.a = t2.x +03)----TableScan: t1 projection=[a] +04)----TableScan: t2 projection=[x] +physical_plan +01)GlobalLimitExec: skip=1, fetch=1 +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)], fetch=2 +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----DataSourceExec: partitions=1, partition_sizes=[1] + +query II +SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 1 OFFSET 1; +---- +2 2 + +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 0; +---- +logical_plan EmptyRelation: rows=0 +physical_plan EmptyExec + +query II +SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 0; +---- + +statement ok +CREATE TABLE t3 (p INT, q VARCHAR) AS VALUES + (1, 'foo'), + (2, 'bar'), + (3, 'baz'); + +query TT +EXPLAIN SELECT t1.a, t2.x, t3.p +FROM t1 +INNER JOIN t2 ON t1.a = t2.x +INNER JOIN t3 ON t2.x = t3.p +LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Inner Join: t2.x = t3.p +03)----Inner Join: t1.a = t2.x +04)------TableScan: t1 projection=[a] +05)------TableScan: t2 projection=[x] +06)----TableScan: t3 projection=[p] +physical_plan +01)ProjectionExec: expr=[a@1 as a, x@2 as x, p@0 as p] +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p@0, x@1)], fetch=2 +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)] +05)------DataSourceExec: partitions=1, partition_sizes=[1] +06)------DataSourceExec: partitions=1, partition_sizes=[1] + +query III +SELECT t1.a, t2.x, t3.p +FROM t1 +INNER JOIN t2 ON t1.a = t2.x +INNER JOIN t3 ON t2.x = t3.p +LIMIT 2; +---- +1 1 1 +2 2 2 + +# Try larger limit +query TT +EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 100; +---- +logical_plan +01)Limit: skip=0, fetch=100 +02)--Inner Join: t1.a = t2.x +03)----TableScan: t1 projection=[a] +04)----TableScan: t2 projection=[x] +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)], fetch=100 +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +query II +SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 100; +---- +1 1 +2 2 +3 3 + +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2; + +statement ok +DROP TABLE t3; diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index dd7f4710d9dbb..2d7075e251c4e 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4161,10 +4161,9 @@ logical_plan 03)----TableScan: t0 projection=[c1, c2] 04)----TableScan: t1 projection=[c1, c2, c3] physical_plan -01)GlobalLimitExec: skip=0, fetch=2 -02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)] -03)----DataSourceExec: partitions=1, partition_sizes=[2] -04)----DataSourceExec: partitions=1, partition_sizes=[2] +01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], fetch=2 +02)--DataSourceExec: partitions=1, partition_sizes=[2] +03)--DataSourceExec: partitions=1, partition_sizes=[2] ## Test join.on.is_empty() && join.filter.is_some() -> single filter now a PWMJ query TT @@ -4191,10 +4190,9 @@ logical_plan 03)----TableScan: t0 projection=[c1, c2] 04)----TableScan: t1 projection=[c1, c2, c3] physical_plan -01)GlobalLimitExec: skip=0, fetch=2 -02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1 -03)----DataSourceExec: partitions=1, partition_sizes=[2] -04)----DataSourceExec: partitions=1, partition_sizes=[2] +01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1, fetch=2 +02)--DataSourceExec: partitions=1, partition_sizes=[2] +03)--DataSourceExec: partitions=1, partition_sizes=[2] ## Add more test cases for join limit pushdown statement ok @@ -4245,6 +4243,7 @@ select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2; 1 1 # can only push down to t1 (preserved side) +# limit pushdown supported for left join - both to join and probe side query TT explain select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2; ---- @@ -4255,10 +4254,9 @@ logical_plan 04)------TableScan: t1 projection=[a], fetch=2 05)----TableScan: t2 projection=[b] physical_plan -01)GlobalLimitExec: skip=0, fetch=2 -02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], limit=2, file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], file_type=csv, has_header=true +01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)], fetch=2 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], limit=2, file_type=csv, has_header=true +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], file_type=csv, has_header=true ###### ## RIGHT JOIN w/ LIMIT @@ -4289,10 +4287,9 @@ logical_plan 04)----Limit: skip=0, fetch=2 05)------TableScan: t2 projection=[b], fetch=2 physical_plan -01)GlobalLimitExec: skip=0, fetch=2 -02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], limit=2, file_type=csv, has_header=true +01)HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)], fetch=2 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], file_type=csv, has_header=true +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], limit=2, file_type=csv, has_header=true ###### ## FULL JOIN w/ LIMIT @@ -4316,7 +4313,7 @@ select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2; 4 4 -# can't push limit for full outer join +# full outer join supports fetch pushdown query TT explain select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2; ---- @@ -4326,10 +4323,9 @@ logical_plan 03)----TableScan: t1 projection=[a] 04)----TableScan: t2 projection=[b] physical_plan -01)GlobalLimitExec: skip=0, fetch=2 -02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], file_type=csv, has_header=true +01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)], fetch=2 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]}, projection=[a], file_type=csv, has_header=true +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]}, projection=[b], file_type=csv, has_header=true statement ok drop table t1; From 78e572ce96079059be30cbe415a82870f7faffea Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 9 Feb 2026 09:22:16 -0600 Subject: [PATCH 2/5] Update datafusion/physical-plan/src/joins/hash_join/exec.rs Co-authored-by: Yongting You <2010youy01@gmail.com> --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 33bee98a64f85..6710d64dd490f 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -259,6 +259,9 @@ pub struct HashJoinExecBuilder { null_equality: NullEquality, null_aware: bool, /// Maximum number of rows to return + /// + /// If the operator produces `< fetch` rows, it returns all available rows. + /// If it produces `>= fetch` rows, it returns exactly `fetch` rows and stops early. fetch: Option, } From 5ff7a995c40b35bb5cb547e6e9c54d20af1f9633 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 9 Feb 2026 10:16:54 -0600 Subject: [PATCH 3/5] remove null aware check --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 6710d64dd490f..348df0f425a74 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1529,13 +1529,6 @@ impl ExecutionPlan for HashJoinExec { } fn with_fetch(&self, limit: Option) -> Option> { - // Null-aware anti join requires seeing ALL probe rows to check for NULLs. - // If any probe row has NULL, the output must be empty. - // We can't stop early or we might miss a NULL and return wrong results. - if self.null_aware { - return None; - } - Some(Arc::new(HashJoinExec { left: Arc::clone(&self.left), right: Arc::clone(&self.right), From 9ce44f30ed6c10729ab832230b51f686e3de7f5b Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 9 Feb 2026 11:04:06 -0600 Subject: [PATCH 4/5] use builder pattern --- .../physical-plan/src/joins/hash_join/exec.rs | 79 +++++++++---------- 1 file changed, 36 insertions(+), 43 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 348df0f425a74..eb578def15453 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -932,25 +932,27 @@ impl HashJoinExec { ) -> Result> { let left = self.left(); let right = self.right(); - let new_join = HashJoinExec::try_new( + let new_join = HashJoinExecBuilder::new( Arc::clone(right), Arc::clone(left), self.on() .iter() .map(|(l, r)| (Arc::clone(r), Arc::clone(l))) .collect(), - self.filter().map(JoinFilter::swap), - &self.join_type().swap(), - swap_join_projection( - left.schema().fields().len(), - right.schema().fields().len(), - self.projection.as_deref(), - self.join_type(), - ), - partition_mode, - self.null_equality(), - self.null_aware, - )?; + self.join_type().swap(), + ) + .with_filter(self.filter().map(JoinFilter::swap)) + .with_projection(swap_join_projection( + left.schema().fields().len(), + right.schema().fields().len(), + self.projection.as_deref(), + self.join_type(), + )) + .with_partition_mode(partition_mode) + .with_null_equality(self.null_equality()) + .with_null_aware(self.null_aware) + .with_fetch(self.fetch) + .build()?; // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again if matches!( self.join_type(), @@ -1400,18 +1402,22 @@ impl ExecutionPlan for HashJoinExec { &schema, self.filter(), )? { - Ok(Some(Arc::new(HashJoinExec::try_new( - Arc::new(projected_left_child), - Arc::new(projected_right_child), - join_on, - join_filter, - self.join_type(), + Ok(Some(Arc::new( + HashJoinExecBuilder::new( + Arc::new(projected_left_child), + Arc::new(projected_right_child), + join_on, + *self.join_type(), + ) + .with_filter(join_filter) // Returned early if projection is not None - None, - *self.partition_mode(), - self.null_equality, - self.null_aware, - )?))) + .with_projection(None) + .with_partition_mode(*self.partition_mode()) + .with_null_equality(self.null_equality) + .with_null_aware(self.null_aware) + .with_fetch(self.fetch) + .build()?, + ))) } else { try_embed_projection(projection, self) } @@ -1529,25 +1535,12 @@ impl ExecutionPlan for HashJoinExec { } fn with_fetch(&self, limit: Option) -> Option> { - Some(Arc::new(HashJoinExec { - left: Arc::clone(&self.left), - right: Arc::clone(&self.right), - on: self.on.clone(), - filter: self.filter.clone(), - join_type: self.join_type, - join_schema: Arc::clone(&self.join_schema), - left_fut: Arc::clone(&self.left_fut), - random_state: self.random_state.clone(), - mode: self.mode, - metrics: ExecutionPlanMetricsSet::new(), - projection: self.projection.clone(), - column_indices: self.column_indices.clone(), - null_equality: self.null_equality, - null_aware: self.null_aware, - cache: self.cache.clone(), - dynamic_filter: self.dynamic_filter.clone(), - fetch: limit, - })) + + HashJoinExecBuilder::from(self) + .with_fetch(limit) + .build() + .ok() + .map(|exec| Arc::new(exec) as _) } } From 3b46fd4f48ddc85c1b03a5b635296defcc98809b Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 9 Feb 2026 11:17:14 -0600 Subject: [PATCH 5/5] add limit to statistics --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index eb578def15453..17fe981063bea 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1373,7 +1373,9 @@ impl ExecutionPlan for HashJoinExec { &self.join_schema, )?; // Project statistics if there is a projection - Ok(stats.project(self.projection.as_ref())) + let stats = stats.project(self.projection.as_ref()); + // Apply fetch limit to statistics + stats.with_fetch(self.fetch, 0, 1) } /// Tries to push `projection` down through `hash_join`. If possible, performs the @@ -1535,7 +1537,6 @@ impl ExecutionPlan for HashJoinExec { } fn with_fetch(&self, limit: Option) -> Option> { - HashJoinExecBuilder::from(self) .with_fetch(limit) .build()