Skip to content

Commit 662917a

Browse files
committed
fix binaryconsume consuming null value
1 parent 0d296df commit 662917a

File tree

2 files changed

+28
-13
lines changed

2 files changed

+28
-13
lines changed

adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@ public BinaryConsumer(VarBinaryVector vector, int index) {
5151

5252
/** consume a InputStream. */
5353
public void consume(InputStream is) throws IOException {
54+
while (currentIndex >= vector.getValueCapacity()) {
55+
vector.reallocValidityAndOffsetBuffers();
56+
}
57+
58+
final int startOffset = vector.getStartOffset(currentIndex);
59+
final ArrowBuf offsetBuffer = vector.getOffsetBuffer();
60+
int dataLength = 0;
61+
5462
if (is != null) {
55-
while (currentIndex >= vector.getValueCapacity()) {
56-
vector.reallocValidityAndOffsetBuffers();
57-
}
58-
final int startOffset = vector.getStartOffset(currentIndex);
59-
final ArrowBuf offsetBuffer = vector.getOffsetBuffer();
60-
int dataLength = 0;
6163
int read;
6264
while ((read = is.read(reuseBytes)) != -1) {
6365
while (vector.getDataBuffer().capacity() < (startOffset + dataLength + read)) {
@@ -66,11 +68,12 @@ public void consume(InputStream is) throws IOException {
6668
vector.getDataBuffer().setBytes(startOffset + dataLength, reuseBytes, 0, read);
6769
dataLength += read;
6870
}
69-
offsetBuffer.setInt(
70-
(currentIndex + 1) * ((long) VarBinaryVector.OFFSET_WIDTH), startOffset + dataLength);
71+
7172
BitVectorHelper.setBit(vector.getValidityBuffer(), currentIndex);
72-
vector.setLastSet(currentIndex);
7373
}
74+
offsetBuffer.setInt(
75+
(currentIndex + 1) * ((long) VarBinaryVector.OFFSET_WIDTH), startOffset + dataLength);
76+
vector.setLastSet(currentIndex);
7477
}
7578

7679
public void moveWriterPosition() {
@@ -95,9 +98,7 @@ public NullableBinaryConsumer(VarBinaryVector vector, int index) {
9598
@Override
9699
public void consume(ResultSet resultSet) throws SQLException, IOException {
97100
InputStream is = resultSet.getBinaryStream(columnIndexInResultSet);
98-
if (!resultSet.wasNull()) {
99-
consume(is);
100-
}
101+
consume(is);
101102
moveWriterPosition();
102103
}
103104
}

adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.io.ByteArrayInputStream;
2424
import java.io.IOException;
25+
import java.io.InputStream;
2526
import org.apache.arrow.vector.BaseValueVector;
2627
import org.apache.arrow.vector.VarBinaryVector;
2728
import org.junit.jupiter.api.Test;
@@ -65,7 +66,11 @@ public void testConsumeInputStream(byte[][] values, boolean nullable) throws IOE
6566
nullable,
6667
binaryConsumer -> {
6768
for (byte[] value : values) {
68-
binaryConsumer.consume(new ByteArrayInputStream(value));
69+
if (value != null) {
70+
binaryConsumer.consume(new ByteArrayInputStream(value));
71+
} else {
72+
binaryConsumer.consume((InputStream) null);
73+
}
6974
binaryConsumer.moveWriterPosition();
7075
}
7176
},
@@ -119,5 +124,14 @@ public void testConsumeInputStream() throws IOException {
119124
testRecords[i] = createBytes(DEFAULT_RECORD_BYTE_COUNT);
120125
}
121126
testConsumeInputStream(testRecords, false);
127+
byte[] bytes1 = new byte[] {1, 2, 3};
128+
byte[] bytes2 = new byte[] {4, 5, 6};
129+
testConsumeInputStream(
130+
new byte[][] {
131+
bytes1,
132+
null,
133+
bytes2
134+
},
135+
true);
122136
}
123137
}

0 commit comments

Comments
 (0)