Skip to content

Conversation

@rahil-c
Copy link
Collaborator

@rahil-c rahil-c commented Dec 28, 2025

Describe the issue this Pull Request addresses

Feature: #14127

Goal: Write in Hudi using bulk-insert, insert, update, and deletes with Lance files and read back the data with the Spark Datasource on a COW Table

Exit Criteria: We should be able to construct a test that writes out multiple commits with spark and we can read back the same data. Testing should include time travel and incremental queries as well to ensure basic functionality works end to end.

Summary and Changelog

  • Implement InternalRowWriter interface which is used by bulk insert, HoodieInternalRowLanceWriter.java
  • Implement FileFormatUtils in order to get upsert/delete functionality working correctly
  • Add more test cases in TestLanceDataSource for ensuring that we have full coverage of the above

Impact

None

Risk Level

Low

Documentation Update

None

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Dec 28, 2025
* This writer is used for bulk insert operations and other optimized write paths that work
* directly with InternalRow objects without HoodieRecord wrappers.
*/
public class HoodieInternalRowLanceWriter extends HoodieBaseLanceWriter<InternalRow>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to update the HoodieSparkLanceWriter to implement the HoodieInternalRowFileWriter so we don't need to duplicate any of the writer logic or configuration in the future?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will look into this.

@rahil-c rahil-c force-pushed the rahil/hudi-lance-spark-datasource-crud-cow branch from 65ad93d to 88904d7 Compare December 30, 2025 14:24
@rahil-c rahil-c changed the title Support COW bulk-insert, insert, upsert, delete works with spark datasource and lance feat: Support COW bulk-insert, insert, upsert, delete works with spark datasource and lance Dec 30, 2025
@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:XL PR with lines of changes > 1000 labels Dec 30, 2025
public ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(HoodieSchema schema) throws IOException {
ClosableIterator<UnsafeRow> iterator = getUnsafeRowIterator(schema);
return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieSparkRecord(data)));
//TODO .copy() is needed for correctness, to investigate further in future.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahil-c what is the status of this TODO?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this TODO is around the need for this .copy() workaround. I have filed a tt with more findings here #17754.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just solve this as part of this? I am getting worried about the number of follow on tasks for the baseline features here. If it uses some shared buffer, then you need to copy. It is similar to other spark iterators that we have. If it is some setup issue, then fix that first and see if the copy is still required.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently in our code we leverage the following UnsafeProjection when converting an InternalRow to an UnsafeRow https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java#L132.

I went thru the spark docs to see if find more findings on UnsafeProjection, but did not see any docs for this so tried examining in spark repo the following code classes to get more insights around the behavior.

Screenshot 2025-12-30 at 6 15 22 PM

When checking the following class, I can see that there is the mention of a shared buffer
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala

with a recommendation for the following:

This class reuses the [[UnsafeRow]] it produces, a consumer should copy the row if it is being buffered

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala

* It generates the code for all the expressions, computes the total length for all the columns
 * (can be accessed via variables), and then copies the data into a scratch buffer space in the
 * form of UnsafeRow (the scratch buffer will grow as needed).
 *
 * @note The returned UnsafeRow will be pointed to a scratch buffer inside the projection.

Based on what you mentioned above

If it uses some shared buffer, then you need to copy.

If we are leveraging copy, im thinking then in the LanceRecordIterator in the next() we should place the copy() there so that callers do not have to themselves worry about calling .copy on the data, like i was doing before in this specific read path.

@Override
  public UnsafeRow next() {
    if (!hasNext()) {
      throw new IllegalStateException("No more records available");
    }
    InternalRow row = rowIterator.next();
    // Convert to UnsafeRow immediately while batch is still open
    return projection.apply(row).copy();
  }

assertEquals(3, commitCount, "Should have 3 completed commits (one per insert)")

// Verify that all commits are bulk_insert commits
val commits = metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.iterator().asScala.toList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can skip the iterator and just go from java list to scala

.orderBy("id")
.collect()

// Verify we have exactly 3 records (only from third commit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some of these verifications, it seems like we should be able to create a list of rows with expected values to make the validation a bit less verbose and easy to evolve in the future.

Map<String, String> paramsMap) throws IOException {
throw new UnsupportedOperationException("serializeRecordsToLogBlock with iterator is not yet supported for Lance format");
}
} No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newline here

StructType sparkSchema,
TaskContextSupplier taskContextSupplier,
HoodieStorage storage) throws IOException {
this(file, sparkSchema, "0", taskContextSupplier, storage, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the instant time null instead of "0" so it is clear it is not set

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@rahil-c
Copy link
Collaborator Author

rahil-c commented Dec 31, 2025

@the-other-tim-brown @voonhous I have retriggered the azure ci wondering if i can get an approval and we can merge this once azure ci green.

@the-other-tim-brown
Copy link
Contributor

image https://dev.azure.com/apachehudi/hudi-oss-ci/_build/results?buildId=10699&view=results

Azure CI is passing but the GH result is not updating

@the-other-tim-brown the-other-tim-brown merged commit 2a52448 into apache:master Dec 31, 2025
71 of 72 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants