Skip to content

Commit bc43a87

Browse files
authored
Merge branch 'apache:main' into main
2 parents cef8415 + 34060eb commit bc43a87

23 files changed

+491
-23
lines changed

adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/CompositeJdbcConsumer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.arrow.util.AutoCloseables;
2525
import org.apache.arrow.vector.ValueVector;
2626
import org.apache.arrow.vector.VectorSchemaRoot;
27-
import org.apache.arrow.vector.types.pojo.ArrowType;
2827

2928
/** Composite consumer which hold all consumers. It manages the consume and cleanup process. */
3029
public class CompositeJdbcConsumer implements JdbcConsumer {
@@ -46,9 +45,9 @@ public void consume(ResultSet rs) throws SQLException, IOException {
4645
BaseConsumer consumer = (BaseConsumer) consumers[i];
4746
JdbcFieldInfo fieldInfo =
4847
new JdbcFieldInfo(rs.getMetaData(), consumer.columnIndexInResultSet);
49-
ArrowType arrowType = consumer.vector.getMinorType().getType();
48+
5049
throw new JdbcConsumerException(
51-
"Exception while consuming JDBC value", e, fieldInfo, arrowType);
50+
"Exception while consuming JDBC value", e, fieldInfo, consumer.vector.getField());
5251
} else {
5352
throw e;
5453
}

adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/exceptions/JdbcConsumerException.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,33 @@
1717
package org.apache.arrow.adapter.jdbc.consumer.exceptions;
1818

1919
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
20-
import org.apache.arrow.vector.types.pojo.ArrowType;
20+
import org.apache.arrow.vector.types.pojo.Field;
2121

2222
/**
2323
* Exception while consuming JDBC data. This exception stores the JdbcFieldInfo for the column and
2424
* the ArrowType for the corresponding vector for easier debugging.
2525
*/
2626
public class JdbcConsumerException extends RuntimeException {
2727
final JdbcFieldInfo fieldInfo;
28-
final ArrowType arrowType;
28+
final Field field;
2929

3030
/**
3131
* Construct JdbcConsumerException with all fields.
3232
*
3333
* @param message error message
3434
* @param cause original exception
3535
* @param fieldInfo JdbcFieldInfo for the column
36-
* @param arrowType ArrowType for the corresponding vector
36+
* @param field ArrowType for the corresponding vector
3737
*/
3838
public JdbcConsumerException(
39-
String message, Throwable cause, JdbcFieldInfo fieldInfo, ArrowType arrowType) {
39+
String message, Throwable cause, JdbcFieldInfo fieldInfo, Field field) {
4040
super(message, cause);
4141
this.fieldInfo = fieldInfo;
42-
this.arrowType = arrowType;
42+
this.field = field;
4343
}
4444

45-
public ArrowType getArrowType() {
46-
return this.arrowType;
45+
public Field getField() {
46+
return this.field;
4747
}
4848

4949
public JdbcFieldInfo getFieldInfo() {

vector/src/main/codegen/includes/vv_imports.ftl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.arrow.vector.complex.*;
3434
import org.apache.arrow.vector.complex.reader.*;
3535
import org.apache.arrow.vector.complex.impl.*;
3636
import org.apache.arrow.vector.complex.writer.*;
37+
import org.apache.arrow.vector.complex.writer.BaseWriter.ExtensionWriter;
3738
import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter;
3839
import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
3940
import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;

vector/src/main/codegen/templates/AbstractFieldReader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ public void copyAsField(String name, ${name}Writer writer) {
109109

110110
</#list></#list>
111111

112+
public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory) {
113+
fail("CopyAsValue StructWriter");
114+
}
115+
112116
public void read(ExtensionHolder holder) {
113117
fail("Extension");
114118
}

vector/src/main/codegen/templates/BaseReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public interface RepeatedStructReader extends StructReader{
4949
boolean next();
5050
int size();
5151
void copyAsValue(StructWriter writer);
52+
void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory);
5253
}
5354

5455
public interface ListReader extends BaseReader{
@@ -59,6 +60,7 @@ public interface RepeatedListReader extends ListReader{
5960
boolean next();
6061
int size();
6162
void copyAsValue(ListWriter writer);
63+
void copyAsValue(ListWriter writer, ExtensionTypeWriterFactory writerFactory);
6264
}
6365

6466
public interface MapReader extends BaseReader{
@@ -69,6 +71,7 @@ public interface RepeatedMapReader extends MapReader{
6971
boolean next();
7072
int size();
7173
void copyAsValue(MapWriter writer);
74+
void copyAsValue(MapWriter writer, ExtensionTypeWriterFactory writerFactory);
7275
}
7376

7477
public interface ScalarReader extends

vector/src/main/codegen/templates/ComplexCopier.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,14 @@ public class ComplexCopier {
4242
* @param output field to write to
4343
*/
4444
public static void copy(FieldReader input, FieldWriter output) {
45-
writeValue(input, output);
45+
writeValue(input, output, null);
4646
}
4747

48-
private static void writeValue(FieldReader reader, FieldWriter writer) {
48+
public static void copy(FieldReader input, FieldWriter output, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
49+
writeValue(input, output, extensionTypeWriterFactory);
50+
}
51+
52+
private static void writeValue(FieldReader reader, FieldWriter writer, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
4953
final MinorType mt = reader.getMinorType();
5054

5155
switch (mt) {
@@ -61,7 +65,7 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
6165
FieldReader childReader = reader.reader();
6266
FieldWriter childWriter = getListWriterForReader(childReader, writer);
6367
if (childReader.isSet()) {
64-
writeValue(childReader, childWriter);
68+
writeValue(childReader, childWriter, extensionTypeWriterFactory);
6569
} else {
6670
childWriter.writeNull();
6771
}
@@ -79,8 +83,8 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
7983
FieldReader structReader = reader.reader();
8084
if (structReader.isSet()) {
8185
writer.startEntry();
82-
writeValue(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()));
83-
writeValue(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()));
86+
writeValue(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()), extensionTypeWriterFactory);
87+
writeValue(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()), extensionTypeWriterFactory);
8488
writer.endEntry();
8589
} else {
8690
writer.writeNull();
@@ -99,7 +103,7 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
99103
if (childReader.getMinorType() != Types.MinorType.NULL) {
100104
FieldWriter childWriter = getStructWriterForReader(childReader, writer, name);
101105
if (childReader.isSet()) {
102-
writeValue(childReader, childWriter);
106+
writeValue(childReader, childWriter, extensionTypeWriterFactory);
103107
} else {
104108
childWriter.writeNull();
105109
}
@@ -110,6 +114,20 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
110114
writer.writeNull();
111115
}
112116
break;
117+
case EXTENSIONTYPE:
118+
if (extensionTypeWriterFactory == null) {
119+
throw new IllegalArgumentException("Must provide ExtensionTypeWriterFactory");
120+
}
121+
if (reader.isSet()) {
122+
Object value = reader.readObject();
123+
if (value != null) {
124+
writer.addExtensionTypeWriterFactory(extensionTypeWriterFactory);
125+
writer.writeExtension(value);
126+
}
127+
} else {
128+
writer.writeNull();
129+
}
130+
break;
113131
<#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
114132
<#assign fields = minor.fields!type.fields />
115133
<#assign uncappedName = name?uncap_first/>
@@ -162,6 +180,9 @@ private static FieldWriter getStructWriterForReader(FieldReader reader, StructWr
162180
return (FieldWriter) writer.map(name);
163181
case LISTVIEW:
164182
return (FieldWriter) writer.listView(name);
183+
case EXTENSIONTYPE:
184+
ExtensionWriter extensionWriter = writer.extension(name, reader.getField().getType());
185+
return (FieldWriter) extensionWriter;
165186
default:
166187
throw new UnsupportedOperationException(reader.getMinorType().toString());
167188
}
@@ -186,6 +207,9 @@ private static FieldWriter getListWriterForReader(FieldReader reader, ListWriter
186207
return (FieldWriter) writer.list();
187208
case LISTVIEW:
188209
return (FieldWriter) writer.listView();
210+
case EXTENSIONTYPE:
211+
ExtensionWriter extensionWriter = writer.extension(reader.getField().getType());
212+
return (FieldWriter) extensionWriter;
189213
default:
190214
throw new UnsupportedOperationException(reader.getMinorType().toString());
191215
}
@@ -211,6 +235,9 @@ private static FieldWriter getMapWriterForReader(FieldReader reader, MapWriter w
211235
return (FieldWriter) writer.listView();
212236
case MAP:
213237
return (FieldWriter) writer.map(false);
238+
case EXTENSIONTYPE:
239+
ExtensionWriter extensionWriter = writer.extension(reader.getField().getType());
240+
return (FieldWriter) extensionWriter;
214241
default:
215242
throw new UnsupportedOperationException(reader.getMinorType().toString());
216243
}

vector/src/main/codegen/templates/NullReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public void read(int arrayIndex, Nullable${name}Holder holder){
8686
}
8787
</#list></#list>
8888

89+
public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory){}
8990
public void read(ExtensionHolder holder) {
9091
holder.isSet = 0;
9192
}

vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.arrow.memory.BufferAllocator;
2323
import org.apache.arrow.memory.ReferenceManager;
2424
import org.apache.arrow.util.Preconditions;
25+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
2526
import org.apache.arrow.vector.complex.reader.FieldReader;
2627
import org.apache.arrow.vector.util.DataSizeRoundingUtil;
2728
import org.apache.arrow.vector.util.TransferPair;
@@ -260,6 +261,18 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
260261
throw new UnsupportedOperationException();
261262
}
262263

264+
@Override
265+
public void copyFrom(
266+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
267+
throw new UnsupportedOperationException();
268+
}
269+
270+
@Override
271+
public void copyFromSafe(
272+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
273+
throw new UnsupportedOperationException();
274+
}
275+
263276
/**
264277
* Transfer the validity buffer from `validityBuffer` to the target vector's `validityBuffer`.
265278
* Start at `startIndex` and copy `length` number of elements. If the starting index is 8 byte

vector/src/main/java/org/apache/arrow/vector/NullVector.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
2828
import org.apache.arrow.util.Preconditions;
2929
import org.apache.arrow.vector.compare.VectorVisitor;
30+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
3031
import org.apache.arrow.vector.complex.impl.NullReader;
3132
import org.apache.arrow.vector.complex.reader.FieldReader;
3233
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
@@ -329,6 +330,18 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
329330
throw new UnsupportedOperationException();
330331
}
331332

333+
@Override
334+
public void copyFrom(
335+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
336+
throw new UnsupportedOperationException();
337+
}
338+
339+
@Override
340+
public void copyFromSafe(
341+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
342+
throw new UnsupportedOperationException();
343+
}
344+
332345
@Override
333346
public String getName() {
334347
return this.getField().getName();

vector/src/main/java/org/apache/arrow/vector/ValueVector.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.arrow.memory.OutOfMemoryException;
2323
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
2424
import org.apache.arrow.vector.compare.VectorVisitor;
25+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
2526
import org.apache.arrow.vector.complex.reader.FieldReader;
2627
import org.apache.arrow.vector.types.Types.MinorType;
2728
import org.apache.arrow.vector.types.pojo.Field;
@@ -309,6 +310,30 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
309310
*/
310311
void copyFromSafe(int fromIndex, int thisIndex, ValueVector from);
311312

313+
/**
314+
* Copy a cell value from a particular index in source vector to a particular position in this
315+
* vector.
316+
*
317+
* @param fromIndex position to copy from in source vector
318+
* @param thisIndex position to copy to in this vector
319+
* @param from source vector
320+
* @param writerFactory the extension type writer factory to use for copying extension type values
321+
*/
322+
void copyFrom(
323+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory);
324+
325+
/**
326+
* Same as {@link #copyFrom(int, int, ValueVector)} except that it handles the case when the
327+
* capacity of the vector needs to be expanded before copy.
328+
*
329+
* @param fromIndex position to copy from in source vector
330+
* @param thisIndex position to copy to in this vector
331+
* @param from source vector
332+
* @param writerFactory the extension type writer factory to use for copying extension type values
333+
*/
334+
void copyFromSafe(
335+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory);
336+
312337
/**
313338
* Accept a generic {@link VectorVisitor} and return the result.
314339
*

0 commit comments

Comments
 (0)