Skip to content

feat: Add statistics-based guards to SortMergeJoin-to-HashJoin rewrite [experimental]#3554

Draft
andygrove wants to merge 2 commits intoapache:mainfrom
andygrove:smj-rewrite-size-guards
Draft

feat: Add statistics-based guards to SortMergeJoin-to-HashJoin rewrite [experimental]#3554
andygrove wants to merge 2 commits intoapache:mainfrom
andygrove:smj-rewrite-size-guards

Conversation

@andygrove
Copy link
Member

Summary

  • Add per-partition size check and size ratio check to RewriteJoin, mirroring Spark's own JoinSelection logic (canBuildLocalHashMapBySize() and muchSmaller())
  • Previously, enabling spark.comet.exec.replaceSortMergeJoin would unconditionally rewrite all SortMergeJoins to ShuffledHashJoins, risking OOM when the build side is too large
  • Now the rewrite only happens when the build side is estimated to fit in memory and is significantly smaller than the probe side
  • Add new config spark.comet.exec.replaceSortMergeJoin.sizeRatio (default: 3) matching Spark's SHUFFLE_HASH_JOIN_FACTOR
  • Log reasons when rewrite is skipped (visible with explainFallback.enabled=true)

Details

Per-partition size check: buildSize < autoBroadcastJoinThreshold * numShufflePartitions

  • Reuses Spark's existing configs — no new threshold to configure
  • When autoBroadcastJoinThreshold = -1 (broadcast disabled), this check is skipped

Size ratio check: buildSize * sizeRatio <= probeSize

  • Default ratio of 3 matches Spark's spark.sql.shuffledHashJoinFactor
  • Ensures hash join is only used when it has a clear advantage over sort-merge

Safe fallback: When no logical plan statistics are available, the rewrite is skipped conservatively.

Test plan

  • Verify existing CometJoinSuite tests still pass
  • Test with TPC-H/TPC-DS to confirm joins are correctly selected
  • Test with explainFallback.enabled=true to verify skip reasons are logged
  • Test edge case: autoBroadcastJoinThreshold=-1 still allows rewrite when size ratio is met

🤖 Generated with Claude Code

Previously, enabling `spark.comet.exec.replaceSortMergeJoin` would
unconditionally rewrite all SortMergeJoins to ShuffledHashJoins,
which could cause OOM when the build side is too large to fit in
memory.

This adds two checks mirroring Spark's own JoinSelection logic:

1. Per-partition size check: the build side must fit in memory using
   Spark's `autoBroadcastJoinThreshold * numShufflePartitions` formula
   (matching `canBuildLocalHashMapBySize()`).

2. Size ratio check: the build side must be significantly smaller than
   the probe side, controlled by a new config
   `spark.comet.exec.replaceSortMergeJoin.sizeRatio` (default: 3),
   matching Spark's `SHUFFLE_HASH_JOIN_FACTOR`.

When either check fails, the SortMergeJoin is kept and the reason is
logged via `withInfo` (visible with `explainFallback.enabled=true`).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@andygrove andygrove changed the title Add statistics-based guards to SortMergeJoin-to-HashJoin rewrite feat: Add statistics-based guards to SortMergeJoin-to-HashJoin rewrite [experimental] Feb 20, 2026
@andygrove
Copy link
Member Author

@sqlbenchmark run tpch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments