[SPARK-55131][SS] Change the default merge operator delimiter for RocksDB to empty string to concat without delimiter#54083
[SPARK-55131][SS] Change the default merge operator delimiter for RocksDB to empty string to concat without delimiter#54083HeartSaVioR wants to merge 11 commits intoapache:masterfrom
Conversation
JIRA Issue Information=== Bug SPARK-55131 === This comment was automatically generated by GitHub Actions |
bd195fe to
aab929f
Compare
| buildConf("spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion") | ||
| .internal() | ||
| .doc("Set the RocksDB merge operator version. This will be stored in the checkpoint when " + | ||
| "starting a streaming query. The checkpoint will use this merge operator version in the " + |
| rocksDbOptions.setAvoidFlushDuringShutdown(true) | ||
| rocksDbOptions.setMergeOperator(new StringAppendOperator()) | ||
| // Set merge operator based on version for backward compatibility | ||
| // Version 1: comma delimiter ",", Version 2: empty string "" |
There was a problem hiding this comment.
Should we note that this not documented but supported in RocksDB ?
| case v => throw new IllegalArgumentException( | ||
| s"Invalid merge operator version: $v. Supported versions are 1 and 2") | ||
| } | ||
| rocksDbOptions.setMergeOperator(new StringAppendOperator(mergeDelimiter)) |
There was a problem hiding this comment.
This is little different than using default constructor we were using earlier ? Will this be equivalent ?
There was a problem hiding this comment.
It should be ',' vs "," and should be the same effect. There is a test confirming like merge("key1", "a") -> merge("key1", "b") -> get("key") with version 1 = "a,b" while with version 2 = "ab".
| // Version 2: Uses empty string "" as delimiter (no delimiter, direct concatenation) | ||
| // | ||
| // Note: this is also defined in `SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION`. | ||
| // These two places should be updated together. |
There was a problem hiding this comment.
Why do we need this in 2 places ?
There was a problem hiding this comment.
We need SQLConf to do the offset log trick. But in the meanwhile, it is easier for us to have the same entry here to automatically take the conf into account for RocksDBConf. We did this earlier - SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION & FORMAT_VERSION.
|
|
||
| // include the number of delimiters used for merge | ||
| resultSize += numValues - 1 | ||
| resultSize += (numValues - 1) * delimiterSize |
There was a problem hiding this comment.
For empty delimiter, resultSize would be 0 then ?
There was a problem hiding this comment.
It's just that we do not add the size for delimiters. resultSize being calculated in above logic retains the same.
| assert(valueRowToData(iterator0.next()) === 1) | ||
| assert(!iterator0.hasNext) | ||
|
|
||
| merge(store, "a", 0, 2) |
There was a problem hiding this comment.
Can we also add a test for some combination of put and merge followed by get as needed to verify the result for both merge delimiter versions ?
There was a problem hiding this comment.
I'll add this tomorrow. This is the only review comment I'm yet to address.
What changes were proposed in this pull request?
This PR proposes to change the default delimiter for the merge operator of RocksDB to an empty string, so that merge operation does not add a delimiter and concat two without any character.
Changing the delimiter isn't compatible with existing checkpoints, so this change is coupled with SQLConf, with known offset log metadata trick, to apply the change only for new streaming queries.
spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersionWhy are the changes needed?
We found out there is no way to distinguish two cases of 1) put against non-existence value then merge and 2) merge against non-existence value then merge, from the current delimiter. There has been an "implication" that operators do ensure they call merge only when they know the operation is against existing key. This effectively requires GET operation which can be an outstanding performance impact depending on the logic.
Making delimiter to an empty string (none) would eliminate the difference between the two cases, allowing operators to perform blind merge without checking the existence of the key.
Does this PR introduce any user-facing change?
No, the change is internal and there is no user-facing change.
How was this patch tested?
Added UTs.
Was this patch authored or co-authored using generative AI tooling?
Co-authored by claude-4.5-sonnet