Skip to content

Commit 9fee9b6

Browse files
authored
Fix overwrite without partition (#18)
1 parent 7703bcd commit 9fee9b6

File tree

3 files changed

+80
-3
lines changed

3 files changed

+80
-3
lines changed

paimon_python_api/write_builder.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@
1818

1919
from abc import ABC, abstractmethod
2020
from paimon_python_api import BatchTableCommit, BatchTableWrite
21+
from typing import Optional
2122

2223

2324
class BatchWriteBuilder(ABC):
2425
"""An interface for building the TableScan and TableRead."""
2526

2627
@abstractmethod
27-
def with_overwrite(self, static_partition: dict) -> 'BatchWriteBuilder':
28+
def overwrite(self, static_partition: Optional[dict] = None) -> 'BatchWriteBuilder':
2829
"""
2930
Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' semantics of SQL.
30-
If you pass an empty dict, it means OVERWRITE whole table.
31+
If you pass None, it means OVERWRITE whole table.
3132
"""
3233

3334
@abstractmethod

paimon_python_java/pypaimon.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,9 @@ def __init__(self, j_batch_write_builder, j_row_type, arrow_schema: pa.Schema):
185185
self._j_row_type = j_row_type
186186
self._arrow_schema = arrow_schema
187187

188-
def with_overwrite(self, static_partition: dict) -> 'BatchWriteBuilder':
188+
def overwrite(self, static_partition: Optional[dict] = None) -> 'BatchWriteBuilder':
189+
if static_partition is None:
190+
static_partition = {}
189191
self._j_batch_write_builder.withOverwrite(static_partition)
190192
return self
191193

paimon_python_java/tests/test_write_and_read.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,3 +297,77 @@ def testAllWriteAndReadApi(self):
297297
actual = table_read.to_pandas(splits)
298298
pd.testing.assert_frame_equal(
299299
actual.reset_index(drop=True), expected.reset_index(drop=True))
300+
301+
def test_overwrite(self):
302+
schema = Schema(self.simple_pa_schema, partition_keys=['f0'],
303+
options={'dynamic-partition-overwrite': 'false'})
304+
self.catalog.create_table('default.test_overwrite', schema, False)
305+
table = self.catalog.get_table('default.test_overwrite')
306+
read_builder = table.new_read_builder()
307+
308+
write_builder = table.new_batch_write_builder()
309+
table_write = write_builder.new_write()
310+
table_commit = write_builder.new_commit()
311+
312+
df0 = pd.DataFrame({
313+
'f0': [1, 1, 2, 2],
314+
'f1': ['apple', 'banana', 'dog', 'cat'],
315+
})
316+
317+
table_write.write_pandas(df0)
318+
table_commit.commit(table_write.prepare_commit())
319+
table_write.close()
320+
table_commit.close()
321+
322+
table_scan = read_builder.new_scan()
323+
table_read = read_builder.new_read()
324+
actual_df0 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
325+
df0['f0'] = df0['f0'].astype('int32')
326+
pd.testing.assert_frame_equal(
327+
actual_df0.reset_index(drop=True), df0.reset_index(drop=True))
328+
329+
write_builder = table.new_batch_write_builder().overwrite({'f0': '1'})
330+
table_write = write_builder.new_write()
331+
table_commit = write_builder.new_commit()
332+
333+
df1 = pd.DataFrame({
334+
'f0': [1],
335+
'f1': ['watermelon'],
336+
})
337+
338+
table_write.write_pandas(df1)
339+
table_commit.commit(table_write.prepare_commit())
340+
table_write.close()
341+
table_commit.close()
342+
343+
table_scan = read_builder.new_scan()
344+
table_read = read_builder.new_read()
345+
actual_df1 = table_read.to_pandas(table_scan.plan().splits())
346+
expected_df1 = pd.DataFrame({
347+
'f0': [2, 2, 1],
348+
'f1': ['dog', 'cat', 'watermelon']
349+
})
350+
expected_df1['f0'] = expected_df1['f0'].astype('int32')
351+
pd.testing.assert_frame_equal(
352+
actual_df1.reset_index(drop=True), expected_df1.reset_index(drop=True))
353+
354+
write_builder = table.new_batch_write_builder().overwrite()
355+
table_write = write_builder.new_write()
356+
table_commit = write_builder.new_commit()
357+
358+
df2 = pd.DataFrame({
359+
'f0': [3],
360+
'f1': ['Neo'],
361+
})
362+
363+
table_write.write_pandas(df2)
364+
table_commit.commit(table_write.prepare_commit())
365+
table_write.close()
366+
table_commit.close()
367+
368+
table_scan = read_builder.new_scan()
369+
table_read = read_builder.new_read()
370+
actual_df2 = table_read.to_pandas(table_scan.plan().splits())
371+
df2['f0'] = df2['f0'].astype('int32')
372+
pd.testing.assert_frame_equal(
373+
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))

0 commit comments

Comments
 (0)