Conversation
|
run benchmarks |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark tpcds |
|
🤖 |
|
(not sure if there is any query benefiting from this in tpch / tpcds, but those contain joins at least) |
|
run benchmark tpch |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
2010YOUY01
left a comment
There was a problem hiding this comment.
Thank you! The implementation looks very straight-forward to me.
I suggest constructing a query that favors this optimization, and ensure the performance improvement can be measured. I don't think there will be any surprises, just to be safe.
| // Null-aware anti join requires seeing ALL probe rows to check for NULLs. | ||
| // If any probe row has NULL, the output must be empty. | ||
| // We can't stop early or we might miss a NULL and return wrong results. | ||
| if self.null_aware { |
There was a problem hiding this comment.
I don't understand this part.
The output_buffer will only get filled when an output entry is finalized, so this should be handled automatically?
| partition_mode: PartitionMode, | ||
| null_equality: NullEquality, | ||
| null_aware: bool, | ||
| /// Maximum number of rows to return |
There was a problem hiding this comment.
| /// Maximum number of rows to return | |
| /// Maximum number of rows to return | |
| /// | |
| /// If the operator produces `< fetch` rows, it returns all available rows. | |
| /// If it produces `>= fetch` rows, it returns exactly `fetch` rows and stops early. |
| // TODO stats: it is not possible in general to know the output size of joins | ||
| // There are some special cases though, for example: | ||
| // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` | ||
| let stats = estimate_join_statistics( |
There was a problem hiding this comment.
Should the statistics take into account the fetch/limit when estimating ?
| self.fetch | ||
| } | ||
|
|
||
| fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> { |
There was a problem hiding this comment.
Is this method really needed ?
If it is not then I'd suggest to remove it.
If the HashJoinExec is already executed then setting a new limit will be confusing/inconsistent unless it is re-executed again.
Alternatively it could be implemented as:
HashJoinExecBuilder::from(self)
.with_fetch(limit)
.build()
.ok()
.map(|exec| Arc::new(exec) as _)This way it won't keep the calculated state.
| ), | ||
| partition_mode, | ||
| self.null_equality(), | ||
| self.null_aware, |
There was a problem hiding this comment.
The fetch field is not preserved here. Is this intentional ?
| None, | ||
| *self.partition_mode(), | ||
| self.null_equality, | ||
| self.null_aware, |
There was a problem hiding this comment.
The fetch field is not preserved here. Is this intentional ?
Which issue does this PR close?
Rationale for this change
What changes are included in this PR?
Push limit down into hash join using limit pushdown optimizer. Use limit pushdown optimizer to pass the limit value to Hash Join exec using
with_fetchand passing thefetchvalue toLimitedBatch Coalescerto emit the batch once the limit is hit.Are these changes tested?
SLT tests + unit tests