Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ public class ArrowWriter implements AutoCloseable {
/**
* The initial capacity of the vectors which are used to store the rows. The capacity will be
* expanded automatically if the rows exceed the initial capacity.
*
* <p>Public for use by nested writers (e.g., ArrowArrayWriter) to determine when to use safe
* write mode based on element indices.
*/
private static final int INITIAL_CAPACITY = 1024;
public static final int INITIAL_CAPACITY = 1024;

/**
* The buffer usage ratio which is used to determine whether the writer is full. The writer is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.fluss.row.DataGetters;
import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.arrow.ArrowWriter;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector;

Expand All @@ -41,7 +42,12 @@ public void doWrite(int rowIndex, DataGetters row, int ordinal, boolean handleSa
listVector.startNewValue(rowIndex);
for (int arrIndex = 0; arrIndex < array.size(); arrIndex++) {
int fieldIndex = offset + arrIndex;
elementWriter.write(fieldIndex, array, arrIndex, handleSafe);
// Use element-based index to determine handleSafe, not parent row count.
// This fixes issue #2164: when row count < INITIAL_CAPACITY but total
// array elements > INITIAL_CAPACITY, we need to use safe mode for elements
// beyond the initial capacity.
boolean elementHandleSafe = fieldIndex >= ArrowWriter.INITIAL_CAPACITY;
elementWriter.write(fieldIndex, array, arrIndex, elementHandleSafe);
}
offset += array.size();
listVector.endValue(rowIndex, array.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,70 @@ void testWriterExceedMaxSizeInBytes() {
"The arrow batch size is full and it shouldn't accept writing new rows, it's a bug.");
}
}

/**
* Tests that array columns work correctly when the total number of array elements exceeds
* INITIAL_CAPACITY (1024) while the row count stays below it. This reproduces a bug where
* ArrowArrayWriter used the parent's handleSafe flag (based on row count) for element writes,
* causing IndexOutOfBoundsException when element indices exceeded the vector's initial
* capacity.
*/
@Test
void testArrayWriterWithManyElements() throws IOException {
// Schema with array column
RowType rowType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.INT())));

try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot root =
VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
ArrowWriterPool provider = new ArrowWriterPool(allocator);
ArrowWriter writer =
provider.getOrCreateWriter(
1L, 1, Integer.MAX_VALUE, rowType, NO_COMPRESSION)) {

// Write 200 rows, each with a 10-element array.
// Total elements = 2000, exceeding INITIAL_CAPACITY (1024).
// But row count (200) < 1024, so handleSafe would be false without the fix.
int numRows = 200;
int arraySize = 10;
for (int i = 0; i < numRows; i++) {
Integer[] elements = new Integer[arraySize];
for (int j = 0; j < arraySize; j++) {
elements[j] = i * arraySize + j;
}
writer.writeRow(GenericRow.of(i, GenericArray.of(elements)));
}

// Verify serialization works without IndexOutOfBoundsException
AbstractPagedOutputView pagedOutputView =
new ManagedPagedOutputView(new TestingMemorySegmentPool(64 * 1024));
int size =
writer.serializeToOutputView(
pagedOutputView, arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE));
assertThat(size).isGreaterThan(0);

// Verify the data can be read back correctly
int heapMemorySize = Math.max(size, writer.estimatedSizeInBytes());
MemorySegment segment = MemorySegment.allocateHeapMemory(heapMemorySize);
MemorySegment firstSegment = pagedOutputView.getCurrentSegment();
firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, size);

ArrowReader reader =
ArrowUtils.createArrowReader(segment, 0, size, root, allocator, rowType);
assertThat(reader.getRowCount()).isEqualTo(numRows);

for (int i = 0; i < numRows; i++) {
ColumnarRow row = reader.read(i);
row.setRowId(i);
assertThat(row.getInt(0)).isEqualTo(i);
assertThat(row.getArray(1).size()).isEqualTo(arraySize);
for (int j = 0; j < arraySize; j++) {
assertThat(row.getArray(1).getInt(j)).isEqualTo(i * arraySize + j);
}
}
}
}
}