-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat: Support COW bulk-insert, insert, upsert, delete works with spark datasource and lance #17731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support COW bulk-insert, insert, upsert, delete works with spark datasource and lance #17731
Conversation
| * This writer is used for bulk insert operations and other optimized write paths that work | ||
| * directly with InternalRow objects without HoodieRecord wrappers. | ||
| */ | ||
| public class HoodieInternalRowLanceWriter extends HoodieBaseLanceWriter<InternalRow> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to update the HoodieSparkLanceWriter to implement the HoodieInternalRowFileWriter so we don't need to duplicate any of the writer logic or configuration in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will look into this.
65ad93d to
88904d7
Compare
| public ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(HoodieSchema schema) throws IOException { | ||
| ClosableIterator<UnsafeRow> iterator = getUnsafeRowIterator(schema); | ||
| return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieSparkRecord(data))); | ||
| //TODO .copy() is needed for correctness, to investigate further in future. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rahil-c what is the status of this TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently this TODO is around the need for this .copy() workaround. I have filed a tt with more findings here #17754.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just solve this as part of this? I am getting worried about the number of follow on tasks for the baseline features here. If it uses some shared buffer, then you need to copy. It is similar to other spark iterators that we have. If it is some setup issue, then fix that first and see if the copy is still required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently in our code we leverage the following UnsafeProjection when converting an InternalRow to an UnsafeRow https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java#L132.
I went thru the spark docs to see if find more findings on UnsafeProjection, but did not see any docs for this so tried examining in spark repo the following code classes to get more insights around the behavior.
When checking the following class, I can see that there is the mention of a shared buffer
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
with a recommendation for the following:
This class reuses the [[UnsafeRow]] it produces, a consumer should copy the row if it is being buffered
* It generates the code for all the expressions, computes the total length for all the columns
* (can be accessed via variables), and then copies the data into a scratch buffer space in the
* form of UnsafeRow (the scratch buffer will grow as needed).
*
* @note The returned UnsafeRow will be pointed to a scratch buffer inside the projection.
Based on what you mentioned above
If it uses some shared buffer, then you need to copy.
If we are leveraging copy, im thinking then in the LanceRecordIterator in the next() we should place the copy() there so that callers do not have to themselves worry about calling .copy on the data, like i was doing before in this specific read path.
@Override
public UnsafeRow next() {
if (!hasNext()) {
throw new IllegalStateException("No more records available");
}
InternalRow row = rowIterator.next();
// Convert to UnsafeRow immediately while batch is still open
return projection.apply(row).copy();
}
| assertEquals(3, commitCount, "Should have 3 completed commits (one per insert)") | ||
|
|
||
| // Verify that all commits are bulk_insert commits | ||
| val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.iterator().asScala.toList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can skip the iterator and just go from java list to scala
| .orderBy("id") | ||
| .collect() | ||
|
|
||
| // Verify we have exactly 3 records (only from third commit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some of these verifications, it seems like we should be able to create a list of rows with expected values to make the validation a bit less verbose and easy to evolve in the future.
...-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
Show resolved
Hide resolved
| Map<String, String> paramsMap) throws IOException { | ||
| throw new UnsupportedOperationException("serializeRecordsToLogBlock with iterator is not yet supported for Lance format"); | ||
| } | ||
| } No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newline here
| StructType sparkSchema, | ||
| TaskContextSupplier taskContextSupplier, | ||
| HoodieStorage storage) throws IOException { | ||
| this(file, sparkSchema, "0", taskContextSupplier, storage, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make the instant time null instead of "0" so it is clear it is not set
|
@the-other-tim-brown @voonhous I have retriggered the azure ci wondering if i can get an approval and we can merge this once azure ci green. |

Describe the issue this Pull Request addresses
Feature: #14127
Goal: Write in Hudi using bulk-insert, insert, update, and deletes with Lance files and read back the data with the Spark Datasource on a COW Table
Exit Criteria: We should be able to construct a test that writes out multiple commits with spark and we can read back the same data. Testing should include time travel and incremental queries as well to ensure basic functionality works end to end.
Summary and Changelog
InternalRowWriterinterface which is used by bulk insert,HoodieInternalRowLanceWriter.javaFileFormatUtilsin order to get upsert/delete functionality working correctlyTestLanceDataSourcefor ensuring that we have full coverage of the aboveImpact
None
Risk Level
Low
Documentation Update
None
Contributor's checklist