Skip to content

Commit b786c73

Browse files
author
yuzelin
committed
fix
1 parent 47931ad commit b786c73

File tree

2 files changed

+11
-15
lines changed

2 files changed

+11
-15
lines changed

paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232

3333
import java.io.ByteArrayInputStream;
3434
import java.util.List;
35-
import java.util.Objects;
3635
import java.util.stream.Collectors;
3736

3837
/** Write Arrow bytes to Paimon. */
@@ -53,11 +52,11 @@ public BytesWriter(TableWrite tableWrite, RowType rowType) {
5352
.collect(Collectors.toList());
5453
}
5554

56-
public void write(byte[] bytes, boolean needCheckSchema) throws Exception {
55+
public void write(byte[] bytes) throws Exception {
5756
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
5857
ArrowStreamReader arrowStreamReader = new ArrowStreamReader(bais, allocator);
5958
VectorSchemaRoot vsr = arrowStreamReader.getVectorSchemaRoot();
60-
if (needCheckSchema && !checkSchema(arrowFields, vsr.getSchema().getFields())) {
59+
if (!checkTypesIgnoreNullability(arrowFields, vsr.getSchema().getFields())) {
6160
throw new RuntimeException(
6261
String.format(
6362
"Input schema isn't consistent with table schema.\n"
@@ -79,25 +78,22 @@ public void close() {
7978
allocator.close();
8079
}
8180

82-
private boolean checkSchema(List<Field> expectedFields, List<Field> actualFields) {
81+
private boolean checkTypesIgnoreNullability(
82+
List<Field> expectedFields, List<Field> actualFields) {
8383
if (expectedFields.size() != actualFields.size()) {
8484
return false;
8585
}
8686

8787
for (int i = 0; i < expectedFields.size(); i++) {
8888
Field expectedField = expectedFields.get(i);
8989
Field actualField = actualFields.get(i);
90-
if (!checkFieldIgnoreNullability(expectedField, actualField)
90+
// ArrowType doesn't have nullability (similar to DataTypeRoot)
91+
if (!actualField.getType().equals(expectedField.getType())
9192
|| !checkSchema(expectedField.getChildren(), actualField.getChildren())) {
9293
return false;
9394
}
9495
}
9596

9697
return true;
9798
}
98-
99-
private boolean checkFieldIgnoreNullability(Field expected, Field actual) {
100-
return Objects.equals(expected.getName(), actual.getName())
101-
&& Objects.equals(expected.getType(), actual.getType());
102-
}
10399
}

paimon_python_java/pypaimon.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,21 +219,21 @@ def __init__(self, j_batch_table_write, j_row_type, arrow_schema: pa.Schema):
219219
def write_arrow(self, table):
220220
for record_batch in table.to_reader():
221221
# TODO: can we use a reusable stream in #_write_arrow_batch ?
222-
self._write_arrow_batch(record_batch, True)
222+
self._write_arrow_batch(record_batch)
223223

224224
def write_arrow_batch(self, record_batch):
225-
self._write_arrow_batch(record_batch, True)
225+
self._write_arrow_batch(record_batch)
226226

227227
def write_pandas(self, dataframe: pd.DataFrame):
228228
record_batch = pa.RecordBatch.from_pandas(dataframe, schema=self._arrow_schema)
229-
self._write_arrow_batch(record_batch, False)
229+
self._write_arrow_batch(record_batch)
230230

231-
def _write_arrow_batch(self, record_batch, check_schema):
231+
def _write_arrow_batch(self, record_batch):
232232
stream = pa.BufferOutputStream()
233233
with pa.RecordBatchStreamWriter(stream, record_batch.schema) as writer:
234234
writer.write(record_batch)
235235
arrow_bytes = stream.getvalue().to_pybytes()
236-
self._j_bytes_writer.write(arrow_bytes, check_schema)
236+
self._j_bytes_writer.write(arrow_bytes)
237237

238238
def prepare_commit(self) -> List['CommitMessage']:
239239
j_commit_messages = self._j_batch_table_write.prepareCommit()

0 commit comments

Comments
 (0)