Skip to content

Commit 7703bcd

Browse files
authored
Add tests for new reader and writer api (#17)
1 parent 953f30f commit 7703bcd

File tree

1 file changed

+78
-0
lines changed

1 file changed

+78
-0
lines changed

paimon_python_java/tests/test_write_and_read.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,81 @@ def testParallelRead(self):
219219
# check data (ignore index)
220220
pd.testing.assert_frame_equal(
221221
result.reset_index(drop=True), expected_df.reset_index(drop=True))
222+
223+
def testAllWriteAndReadApi(self):
224+
schema = Schema(self.simple_pa_schema)
225+
self.catalog.create_table('default.test_all_api', schema, False)
226+
table = self.catalog.get_table('default.test_all_api')
227+
write_builder = table.new_batch_write_builder()
228+
229+
# write_arrow
230+
table_write = write_builder.new_write()
231+
table_commit = write_builder.new_commit()
232+
data1 = {
233+
'f0': [1, 2, 3],
234+
'f1': ['a', 'b', 'c'],
235+
}
236+
pa_table = pa.Table.from_pydict(data1, schema=self.simple_pa_schema)
237+
table_write.write_arrow(pa_table)
238+
table_commit.commit(table_write.prepare_commit())
239+
table_write.close()
240+
table_commit.close()
241+
242+
# write_arrow_batch
243+
table_write = write_builder.new_write()
244+
table_commit = write_builder.new_commit()
245+
data2 = {
246+
'f0': [4, 5, 6],
247+
'f1': ['d', 'e', 'f'],
248+
}
249+
df = pd.DataFrame(data2)
250+
record_batch = pa.RecordBatch.from_pandas(df, schema=self.simple_pa_schema)
251+
table_write.write_arrow_batch(record_batch)
252+
table_commit.commit(table_write.prepare_commit())
253+
table_write.close()
254+
table_commit.close()
255+
256+
# write_pandas
257+
table_write = write_builder.new_write()
258+
table_commit = write_builder.new_commit()
259+
data3 = {
260+
'f0': [7, 8, 9],
261+
'f1': ['g', 'h', 'i'],
262+
}
263+
df = pd.DataFrame(data3)
264+
table_write.write_pandas(df)
265+
table_commit.commit(table_write.prepare_commit())
266+
table_write.close()
267+
table_commit.close()
268+
269+
read_builder = table.new_read_builder()
270+
table_scan = read_builder.new_scan()
271+
table_read = read_builder.new_read()
272+
splits = table_scan.plan().splits()
273+
274+
# to_arrow
275+
actual = table_read.to_arrow(splits)
276+
expected = pa.Table.from_pydict({
277+
'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
278+
'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
279+
}, schema=self.simple_pa_schema)
280+
self.assertEqual(actual, expected)
281+
282+
# to_arrow_batch_reader
283+
data_frames = [
284+
batch.to_pandas()
285+
for batch in table_read.to_arrow_batch_reader(splits)
286+
]
287+
actual = pd.concat(data_frames)
288+
expected = pd.DataFrame({
289+
'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
290+
'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
291+
})
292+
expected['f0'] = expected['f0'].astype('int32')
293+
pd.testing.assert_frame_equal(
294+
actual.reset_index(drop=True), expected.reset_index(drop=True))
295+
296+
# to_pandas
297+
actual = table_read.to_pandas(splits)
298+
pd.testing.assert_frame_equal(
299+
actual.reset_index(drop=True), expected.reset_index(drop=True))

0 commit comments

Comments
 (0)