Skip to content

Commit 03108ec

Browse files
authored
Add read API to convert result to DuckDB and Ray (#28)
1 parent f09dc58 commit 03108ec

File tree

4 files changed

+70
-12
lines changed

4 files changed

+70
-12
lines changed

dev/dev-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,5 @@ numpy>=1.22.4
2626
python-dateutil>=2.8.0,<3
2727
pytz>=2018.3
2828
pytest~=7.0
29+
duckdb>=0.5.0,<2.0.0
30+
ray~=2.10.0

paimon_python_api/table_read.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
import pandas as pd
2020
import pyarrow as pa
21+
import ray
2122

2223
from abc import ABC, abstractmethod
24+
from duckdb.duckdb import DuckDBPyConnection
2325
from paimon_python_api import Split
24-
from typing import List
26+
from typing import List, Optional
2527

2628

2729
class TableRead(ABC):
@@ -38,3 +40,15 @@ def to_arrow_batch_reader(self, splits: List[Split]) -> pa.RecordBatchReader:
3840
@abstractmethod
3941
def to_pandas(self, splits: List[Split]) -> pd.DataFrame:
4042
"""Read data from splits and converted to pandas.DataFrame format."""
43+
44+
@abstractmethod
45+
def to_duckdb(
46+
self,
47+
splits: List[Split],
48+
table_name: str,
49+
connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
50+
"""Convert splits into an in-memory DuckDB table which can be queried."""
51+
52+
@abstractmethod
53+
def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset:
54+
"""Convert splits into a Ray dataset format."""

paimon_python_java/pypaimon.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616
# limitations under the License.
1717
################################################################################
1818

19+
import duckdb
1920
import pandas as pd
2021
import pyarrow as pa
22+
import ray
2123

24+
from duckdb.duckdb import DuckDBPyConnection
2225
from paimon_python_java.java_gateway import get_gateway
2326
from paimon_python_java.util import java_utils, constants
2427
from paimon_python_api import (catalog, table, read_builder, table_scan, split, table_read,
@@ -161,6 +164,18 @@ def to_arrow_batch_reader(self, splits):
161164
def to_pandas(self, splits: List[Split]) -> pd.DataFrame:
162165
return self.to_arrow(splits).to_pandas()
163166

167+
def to_duckdb(
168+
self,
169+
splits: List[Split],
170+
table_name: str,
171+
connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
172+
con = connection or duckdb.connect(database=":memory:")
173+
con.register(table_name, self.to_arrow(splits))
174+
return con
175+
176+
def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset:
177+
return ray.data.from_arrow(self.to_arrow(splits))
178+
164179
def _init(self):
165180
if self._j_bytes_reader is None:
166181
# get thread num

paimon_python_java/tests/test_write_and_read.py

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -267,17 +267,20 @@ def testAllWriteAndReadApi(self):
267267
table_write.close()
268268
table_commit.close()
269269

270+
all_data = pd.DataFrame({
271+
'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
272+
'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
273+
})
274+
all_data['f0'] = all_data['f0'].astype('int32')
275+
270276
read_builder = table.new_read_builder()
271277
table_scan = read_builder.new_scan()
272278
table_read = read_builder.new_read()
273279
splits = table_scan.plan().splits()
274280

275281
# to_arrow
276282
actual = table_read.to_arrow(splits)
277-
expected = pa.Table.from_pydict({
278-
'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
279-
'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
280-
}, schema=self.simple_pa_schema)
283+
expected = pa.Table.from_pandas(all_data, schema=self.simple_pa_schema)
281284
self.assertEqual(actual, expected)
282285

283286
# to_arrow_batch_reader
@@ -286,18 +289,42 @@ def testAllWriteAndReadApi(self):
286289
for batch in table_read.to_arrow_batch_reader(splits)
287290
]
288291
actual = pd.concat(data_frames)
289-
expected = pd.DataFrame({
290-
'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
291-
'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
292-
})
293-
expected['f0'] = expected['f0'].astype('int32')
294292
pd.testing.assert_frame_equal(
295-
actual.reset_index(drop=True), expected.reset_index(drop=True))
293+
actual.reset_index(drop=True), all_data.reset_index(drop=True))
296294

297295
# to_pandas
298296
actual = table_read.to_pandas(splits)
299297
pd.testing.assert_frame_equal(
300-
actual.reset_index(drop=True), expected.reset_index(drop=True))
298+
actual.reset_index(drop=True), all_data.reset_index(drop=True))
299+
300+
# to_duckdb
301+
duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
302+
# select *
303+
result1 = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()
304+
pd.testing.assert_frame_equal(
305+
result1.reset_index(drop=True), all_data.reset_index(drop=True))
306+
# select * where
307+
result2 = duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 < 4").fetchdf()
308+
expected2 = pd.DataFrame({
309+
'f0': [1, 2, 3],
310+
'f1': ['a', 'b', 'c']
311+
})
312+
expected2['f0'] = expected2['f0'].astype('int32')
313+
pd.testing.assert_frame_equal(
314+
result2.reset_index(drop=True), expected2.reset_index(drop=True))
315+
# select f0 where
316+
result3 = duckdb_con.query("SELECT f0 FROM duckdb_table WHERE f0 < 4").fetchdf()
317+
expected3 = pd.DataFrame({
318+
'f0': [1, 2, 3]
319+
})
320+
expected3['f0'] = expected3['f0'].astype('int32')
321+
pd.testing.assert_frame_equal(
322+
result3.reset_index(drop=True), expected3.reset_index(drop=True))
323+
324+
# to_ray
325+
ray_dataset = table_read.to_ray(splits)
326+
pd.testing.assert_frame_equal(
327+
ray_dataset.to_pandas().reset_index(drop=True), all_data.reset_index(drop=True))
301328

302329
def test_overwrite(self):
303330
schema = Schema(self.simple_pa_schema, partition_keys=['f0'],

0 commit comments

Comments
 (0)