From 662917a8a5a4e4ed9b8e29888bda46ce9549ffde Mon Sep 17 00:00:00 2001 From: wangyunlai Date: Sun, 27 Apr 2025 12:25:45 +0800 Subject: [PATCH 1/2] fix binaryconsume consuming null value --- .../adapter/jdbc/consumer/BinaryConsumer.java | 25 ++++++++++--------- .../jdbc/consumer/BinaryConsumerTest.java | 16 +++++++++++- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java index edbc6360df..265e968d48 100644 --- a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java +++ b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java @@ -51,13 +51,15 @@ public BinaryConsumer(VarBinaryVector vector, int index) { /** consume a InputStream. */ public void consume(InputStream is) throws IOException { + while (currentIndex >= vector.getValueCapacity()) { + vector.reallocValidityAndOffsetBuffers(); + } + + final int startOffset = vector.getStartOffset(currentIndex); + final ArrowBuf offsetBuffer = vector.getOffsetBuffer(); + int dataLength = 0; + if (is != null) { - while (currentIndex >= vector.getValueCapacity()) { - vector.reallocValidityAndOffsetBuffers(); - } - final int startOffset = vector.getStartOffset(currentIndex); - final ArrowBuf offsetBuffer = vector.getOffsetBuffer(); - int dataLength = 0; int read; while ((read = is.read(reuseBytes)) != -1) { while (vector.getDataBuffer().capacity() < (startOffset + dataLength + read)) { @@ -66,11 +68,12 @@ public void consume(InputStream is) throws IOException { vector.getDataBuffer().setBytes(startOffset + dataLength, reuseBytes, 0, read); dataLength += read; } - offsetBuffer.setInt( - (currentIndex + 1) * ((long) VarBinaryVector.OFFSET_WIDTH), startOffset + dataLength); + BitVectorHelper.setBit(vector.getValidityBuffer(), currentIndex); - vector.setLastSet(currentIndex); } + offsetBuffer.setInt( + (currentIndex + 1) * ((long) VarBinaryVector.OFFSET_WIDTH), startOffset + dataLength); + vector.setLastSet(currentIndex); } public void moveWriterPosition() { @@ -95,9 +98,7 @@ public NullableBinaryConsumer(VarBinaryVector vector, int index) { @Override public void consume(ResultSet resultSet) throws SQLException, IOException { InputStream is = resultSet.getBinaryStream(columnIndexInResultSet); - if (!resultSet.wasNull()) { - consume(is); - } + consume(is); moveWriterPosition(); } } diff --git a/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java b/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java index b1e253794d..16a7b50920 100644 --- a/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java +++ b/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java @@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import org.apache.arrow.vector.BaseValueVector; import org.apache.arrow.vector.VarBinaryVector; import org.junit.jupiter.api.Test; @@ -65,7 +66,11 @@ public void testConsumeInputStream(byte[][] values, boolean nullable) throws IOE nullable, binaryConsumer -> { for (byte[] value : values) { - binaryConsumer.consume(new ByteArrayInputStream(value)); + if (value != null) { + binaryConsumer.consume(new ByteArrayInputStream(value)); + } else { + binaryConsumer.consume((InputStream) null); + } binaryConsumer.moveWriterPosition(); } }, @@ -119,5 +124,14 @@ public void testConsumeInputStream() throws IOException { testRecords[i] = createBytes(DEFAULT_RECORD_BYTE_COUNT); } testConsumeInputStream(testRecords, false); + byte[] bytes1 = new byte[] {1, 2, 3}; + byte[] bytes2 = new byte[] {4, 5, 6}; + testConsumeInputStream( + new byte[][] { + bytes1, + null, + bytes2 + }, + true); } } From 1718bc7a89aca3a3d0dfe8aba997a329bbf03939 Mon Sep 17 00:00:00 2001 From: wangyunlai Date: Sun, 27 Apr 2025 19:26:59 +0800 Subject: [PATCH 2/2] fix code format --- .../arrow/adapter/jdbc/consumer/BinaryConsumer.java | 2 +- .../arrow/adapter/jdbc/consumer/BinaryConsumerTest.java | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java index 265e968d48..73ec04b8a0 100644 --- a/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java +++ b/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java @@ -72,7 +72,7 @@ public void consume(InputStream is) throws IOException { BitVectorHelper.setBit(vector.getValidityBuffer(), currentIndex); } offsetBuffer.setInt( - (currentIndex + 1) * ((long) VarBinaryVector.OFFSET_WIDTH), startOffset + dataLength); + (currentIndex + 1) * ((long) VarBinaryVector.OFFSET_WIDTH), startOffset + dataLength); vector.setLastSet(currentIndex); } diff --git a/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java b/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java index 16a7b50920..bb836578e2 100644 --- a/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java +++ b/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java @@ -124,14 +124,9 @@ public void testConsumeInputStream() throws IOException { testRecords[i] = createBytes(DEFAULT_RECORD_BYTE_COUNT); } testConsumeInputStream(testRecords, false); + byte[] bytes1 = new byte[] {1, 2, 3}; byte[] bytes2 = new byte[] {4, 5, 6}; - testConsumeInputStream( - new byte[][] { - bytes1, - null, - bytes2 - }, - true); + testConsumeInputStream(new byte[][] {bytes1, null, bytes2}, true); } }