-
Notifications
You must be signed in to change notification settings - Fork 1.9k
perfect hash join #19411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
perfect hash join #19411
Conversation
| ) -> Result<Self> { | ||
| // Initialize with 0 (sentinel for not found) | ||
| let mut data: Vec<u32> = vec![0; range]; | ||
| let mut next: Option<Vec<u32>> = None; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let mut next: Option<Vec<u32>> = None; | |
| let mut next: Vec<u32> = vec![]; |
I think this should work as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, it's cleaner this way
|
run benchmarks |
|
🤖 |
|
run benchmark tpcds |
datafusion/common/src/config.rs
Outdated
| /// | ||
| /// TODO: Currently only supports cases where left_side.num_rows() < u32::MAX. | ||
| /// Support for left_side.num_rows() >= u32::MAX will be added in the future. | ||
| pub perfect_hash_join_min_key_density: f64, default = 0.99 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems very high? For a hashmap I believe it's ~75% default (plus it has some more overhead per key), so I think a 75% probably could still be better overall?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great point.
I'll add some benchmarks to compare the performance at different densities, including 75%, to find the optimal value for our use case. I'll update this based on the results.
Thanks for the suggestion!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can set it to 20%.
- In terms of memory usage (run with this code: ),
ArrayMapconsumes less memory thanJoinHashMapat a 20% density, even with duplicate keys. - Based on the hj.rs benchmark(results in the PR description),
ArrayMapis also faster thanJoinHashMapat 20% density, regardless of whether there are duplicate keys.
Memory Comparison Matrix (num_rows = 1000000)
| Density | Dup Rate | ArrayMap (MB) | JoinHashMap (MB) | Ratio (AM/JHM) |
|---------|----------|---------------|------------------|----------------|
| 100% | 0% | 3.81 | 37.81 | 0.10x |
| 100% | 25% | 7.63 | 37.81 | 0.20x |
| 100% | 50% | 7.63 | 37.81 | 0.20x |
| 100% | 75% | 7.63 | 37.81 | 0.20x |
| 75% | 0% | 5.09 | 37.81 | 0.13x |
| 75% | 25% | 8.90 | 37.81 | 0.24x |
| 75% | 50% | 8.90 | 37.81 | 0.24x |
| 75% | 75% | 8.90 | 37.81 | 0.24x |
| 50% | 0% | 7.63 | 37.81 | 0.20x |
| 50% | 25% | 11.44 | 37.81 | 0.30x |
| 50% | 50% | 11.44 | 37.81 | 0.30x |
| 50% | 75% | 11.44 | 37.81 | 0.30x |
| 20% | 0% | 19.07 | 37.81 | 0.50x |
| 20% | 25% | 22.89 | 37.81 | 0.61x |
| 20% | 50% | 22.89 | 37.81 | 0.61x |
| 20% | 75% | 22.89 | 37.81 | 0.61x |
| 10% | 0% | 38.15 | 37.81 | 1.01x |
| 10% | 25% | 41.96 | 37.81 | 1.11x |
| 10% | 50% | 41.96 | 37.81 | 1.11x |
| 10% | 75% | 41.96 | 37.81 | 1.11x |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about ~15%?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Setting it to ~15% works well—it not only improves performance but also reduces the memory footprint (space usage). I've updated the threshold.
Memory Comparison Matrix (num_rows = 1000000)
| Density | Dup Rate | ArrayMap (MB) | JoinHashMap (MB) | Ratio (AM/JHM) |
|---------|----------|---------------|------------------|----------------|
...
| 15% | 0% | 25.43 | 37.81 | 0.67x |
| 15% | 25% | 29.25 | 37.81 | 0.77x |
| 15% | 50% | 29.25 | 37.81 | 0.77x |
| 15% | 75% | 29.25 | 37.81 | 0.77x |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
| comparison = BenchmarkRun.load_from_file(comparison_path) | ||
|
|
||
| console = Console() | ||
| console = Console(width=200) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've increased the console width to 200. I added more information like 'density' to the queryName, which made it longer and caused it to be cut off in the output before
| baseline_header = baseline_path.parent.stem | ||
| comparison_header = comparison_path.parent.stem | ||
| baseline_header = baseline_path.parent.name | ||
| comparison_header = comparison_path.parent.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, a path like .../density=0.1/... was incorrectly shortened to density=0. Now, by using .parent.name, we correctly get the full directory name, density=0.1
8044bce to
954f1df
Compare
|
Can you solve the conflicts? |
37cd7ad to
2580507
Compare
I've resolved the conflicts. Once #19602 is merged, I will update |
Done |
I have added contain_keys to ArrayMap and updated the logic accordingly. |
|
run benchmarks |
|
🤖 |
|
run benchmark tpch |
|
run benchmark tpcds |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
Benchmark script failed with exit code 1. Last 10 lines of output: Click to expand |
Very nice! |
|
run benchmark hj |
|
🤖 Hi @Dandandan, thanks for the request (#19411 (comment)).
Please choose one or more of these with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a "perfect hash join" optimization for DataFusion's hash join implementation. When join keys are dense integers within a limited range, an array-based direct mapping (ArrayMap) is used instead of a general-purpose HashMap, resulting in significant performance improvements.
Key Changes:
- Introduces
ArrayMapstruct for dense integer join key mapping using direct array indexing - Adds two configuration parameters:
perfect_hash_join_small_build_threshold(default: 1024) andperfect_hash_join_min_key_density(default: 0.2) - Refactors hash join code to conditionally use
ArrayMaporHashMapvia a newMapenum
Reviewed changes
Copilot reviewed 17 out of 18 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/physical-plan/src/joins/array_map.rs | New file implementing ArrayMap for dense integer join keys with support for signed/unsigned integers via wrapping arithmetic |
| datafusion/physical-plan/src/joins/chain.rs | Extracted chain traversal logic shared between ArrayMap and JoinHashMap |
| datafusion/physical-plan/src/joins/mod.rs | Adds Map enum to unify HashMap and ArrayMap interfaces |
| datafusion/physical-plan/src/joins/hash_join/exec.rs | Implements try_create_array_map logic and integrates perfect hash join into build phase |
| datafusion/physical-plan/src/joins/hash_join/stream.rs | Updates stream processing to handle both HashMap and ArrayMap variants |
| datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs | Updates bounds tracking to use Map instead of JoinHashMapType |
| datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs | Updates HashTableLookupExpr to support both map types |
| datafusion/physical-plan/src/joins/join_hash_map.rs | Changes JoinHashMapOffset to MapOffset and extracts chain traversal |
| datafusion/physical-plan/src/joins/stream_join_utils.rs | Updates to use new MapOffset type |
| datafusion/common/src/config.rs | Adds two new configuration parameters for perfect hash join tuning |
| docs/source/user-guide/configs.md | Documents the new configuration parameters |
| datafusion/sqllogictest/test_files/information_schema.slt | Updates expected configuration output |
| datafusion/proto/tests/cases/roundtrip_physical_plan.rs | Updates serialization tests to handle new Map enum |
| datafusion/physical-plan/Cargo.toml | Adds num-traits dependency for numeric type conversions |
| benchmarks/src/hj.rs | Rewrites hash join benchmarks to use TPC-H data with varying densities |
| benchmarks/bench.sh | Updates to generate TPC-H SF10 data for hash join benchmarks |
| benchmarks/compare.py | Formatting improvements for wider query names |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> Result<Self> { | ||
| let range = max_val.wrapping_sub(min_val); | ||
| let size = (range + 1) as usize; |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cast (range + 1) as usize on line 163 could potentially overflow if range is close to u64::MAX. While the threshold checks in try_create_array_map should prevent extremely large ranges, there's no explicit check here. Consider adding a bounds check or documenting the assumption that range is validated before calling this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a check to prevent potential overflow when calculating size.
| pub fn estimate_memory_size(min_val: u64, max_val: u64, num_rows: usize) -> usize { | ||
| let range = Self::calculate_range(min_val, max_val); | ||
| let size = (range + 1) as usize; |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to try_new, the cast (range + 1) as usize on line 148 could overflow if range is close to u64::MAX. Since this function is used to estimate memory before allocation, an overflow could lead to allocating much less memory than needed or wrapping to a very small value. Consider adding validation or using checked arithmetic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a check to prevent potential overflow when calculating size.
| session_config | ||
| .options_mut() | ||
| .execution | ||
| .perfect_hash_join_min_key_density = 1.0 / 0.0; |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The division by zero 1.0 / 0.0 is used to create positive infinity for disabling perfect hash join. While this works in Rust (floating-point division by zero produces infinity, not a panic), it's unclear and unconventional. Consider using f64::INFINITY for better readability and clearer intent.
| .perfect_hash_join_min_key_density = 1.0 / 0.0; | |
| .perfect_hash_join_min_key_density = f64::INFINITY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accepted. Using f64::INFINITY is indeed much clearer.
| /// (also known as a "perfect hash join") instead of a general-purpose hash map. | ||
| /// This optimization is used when: | ||
| /// 1. There is exactly one join key. | ||
| /// 2. The join key can be any integer type convertible to u64 (excluding i128 and u128). |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation refers to "excluding i128 and u128" types, but these types are not actually checked or handled anywhere in the code. The is_supported_type function in array_map.rs only supports up to 64-bit integers (Int8-Int64, UInt8-UInt64). Consider removing the mention of i128/u128 from the documentation or clarifying that they're not supported due to design limitations, not just current implementation.
| /// 2. The join key can be any integer type convertible to u64 (excluding i128 and u128). | |
| /// 2. The join key is an integer type up to 64 bits wide that can be losslessly converted | |
| /// to `u64` (128-bit integer types such as `i128` and `u128` are not supported). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've applied the suggested change
Which issue does this PR close?
Rationale for this change
This PR introduces a Perfect Hash Join optimization by using an array-based direct mapping(
ArrayMap) instead of a HashMap.The array-based approach outperforms the standard Hash Join when the build-side keys are dense (i.e., the ratio of
count / (max - min+1)is high) or when the key range(max - min)is sufficiently small.The following results from the hj.rs benchmark suite. The benchmark was executed with the optimization enabled by setting
DATAFUSION_EXECUTION_PERFECT_HASH_JOIN_MIN_KEY_DENSITY=0.1The following results from tpch-sf10
What changes are included in this PR?
collect_left_input(build) phase, we now conditionally use anArrayMapinstead of a standardJoinHashMapType. This optimization is triggered only when the following conditions are met:u32::MAXArrayMapworks by storing the minimum key as an offset and using a Vec to directly map a keykto its build-side index viadata[k- offset].HashMap performance across varying key densities and probe hit rates
Are these changes tested?
Yes
Are there any user-facing changes?
Yes, this PR introduces two new session configuration parameters to control the behavior of the Perfect Hash Join optimization:
perfect_hash_join_small_build_threshold: This parameter defines the maximum key range (max_key - min_key) for the build side to be considered "small." If the key range is below this threshold, the array-based join will be triggered regardless of key density.perfect_hash_join_min_key_density: This parameter sets the minimum density (row_count / key_range) required to enable the perfect hash join optimization for larger key ranges