Skip to content

[SPARK-55131][SS] Change the default merge operator delimiter for RocksDB to empty string to concat without delimiter#54083

Open
HeartSaVioR wants to merge 11 commits intoapache:masterfrom
HeartSaVioR:SPARK-55131
Open

[SPARK-55131][SS] Change the default merge operator delimiter for RocksDB to empty string to concat without delimiter#54083
HeartSaVioR wants to merge 11 commits intoapache:masterfrom
HeartSaVioR:SPARK-55131

Conversation

@HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to change the default delimiter for the merge operator of RocksDB to an empty string, so that merge operation does not add a delimiter and concat two without any character.

Changing the delimiter isn't compatible with existing checkpoints, so this change is coupled with SQLConf, with known offset log metadata trick, to apply the change only for new streaming queries.

  • New SQL config: spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion
  • Default: 2 ('' as delimiter)
  • Default for existing checkpoints: 1 (',' as delimiter)

Why are the changes needed?

We found out there is no way to distinguish two cases of 1) put against non-existence value then merge and 2) merge against non-existence value then merge, from the current delimiter. There has been an "implication" that operators do ensure they call merge only when they know the operation is against existing key. This effectively requires GET operation which can be an outstanding performance impact depending on the logic.

Making delimiter to an empty string (none) would eliminate the difference between the two cases, allowing operators to perform blind merge without checking the existence of the key.

Does this PR introduce any user-facing change?

No, the change is internal and there is no user-facing change.

How was this patch tested?

Added UTs.

Was this patch authored or co-authored using generative AI tooling?

Co-authored by claude-4.5-sonnet

@github-actions
Copy link

github-actions bot commented Feb 1, 2026

JIRA Issue Information

=== Bug SPARK-55131 ===
Summary: The default delimiter of StringAppendOperator (merge operator for RocksDB) conflicts when merge is used with non-existence value
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

buildConf("spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion")
.internal()
.doc("Set the RocksDB merge operator version. This will be stored in the checkpoint when " +
"starting a streaming query. The checkpoint will use this merge operator version in the " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for the entire

rocksDbOptions.setAvoidFlushDuringShutdown(true)
rocksDbOptions.setMergeOperator(new StringAppendOperator())
// Set merge operator based on version for backward compatibility
// Version 1: comma delimiter ",", Version 2: empty string ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we note that this not documented but supported in RocksDB ?

case v => throw new IllegalArgumentException(
s"Invalid merge operator version: $v. Supported versions are 1 and 2")
}
rocksDbOptions.setMergeOperator(new StringAppendOperator(mergeDelimiter))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is little different than using default constructor we were using earlier ? Will this be equivalent ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be ',' vs "," and should be the same effect. There is a test confirming like merge("key1", "a") -> merge("key1", "b") -> get("key") with version 1 = "a,b" while with version 2 = "ab".

// Version 2: Uses empty string "" as delimiter (no delimiter, direct concatenation)
//
// Note: this is also defined in `SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION`.
// These two places should be updated together.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this in 2 places ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need SQLConf to do the offset log trick. But in the meanwhile, it is easier for us to have the same entry here to automatically take the conf into account for RocksDBConf. We did this earlier - SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION & FORMAT_VERSION.


// include the number of delimiters used for merge
resultSize += numValues - 1
resultSize += (numValues - 1) * delimiterSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For empty delimiter, resultSize would be 0 then ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just that we do not add the size for delimiters. resultSize being calculated in above logic retains the same.

assert(valueRowToData(iterator0.next()) === 1)
assert(!iterator0.hasNext)

merge(store, "a", 0, 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test for some combination of put and merge followed by get as needed to verify the result for both merge delimiter versions ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add this tomorrow. This is the only review comment I'm yet to address.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants