Skip to content

Commit 75d00d7

Browse files
authored
#38 Expose More Metadata in Object APIs (#39)
1 parent 54237c2 commit 75d00d7

File tree

5 files changed

+154
-2
lines changed

5 files changed

+154
-2
lines changed

pypaimon/api/read_builder.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
from pypaimon.api import TableRead, TableScan, Predicate, PredicateBuilder
2121
from typing import List
2222

23+
from pypaimon.api.row_type import RowType
24+
2325

2426
class ReadBuilder(ABC):
2527
"""An interface for building the TableScan and TableRead."""
@@ -50,3 +52,10 @@ def new_read(self) -> TableRead:
5052
@abstractmethod
5153
def new_predicate_builder(self) -> PredicateBuilder:
5254
"""Create a builder for Predicate."""
55+
56+
@abstractmethod
57+
def read_type(self) -> RowType:
58+
"""
59+
Return the row type of the builder. If there is a projection inside
60+
the builder, the row type will only contain the selected fields.
61+
"""

pypaimon/api/row_type.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#################################################################################
18+
19+
import pyarrow as pa
20+
21+
from abc import ABC, abstractmethod
22+
23+
24+
class RowType(ABC):
25+
"""Data type of a sequence of fields."""
26+
27+
@abstractmethod
28+
def as_arrow(self) -> "pa.Schema":
29+
"""Return the row type as an Arrow schema."""

pypaimon/api/split.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,20 @@
1616
# limitations under the License.
1717
#################################################################################
1818

19-
from abc import ABC
19+
from abc import ABC, abstractmethod
20+
21+
from typing import Iterator
2022

2123

2224
class Split(ABC):
2325
"""An input split for reading. The most important subclass is DataSplit."""
26+
27+
@abstractmethod
28+
def row_count(self) -> int:
29+
"""Return the total row count of the split."""
30+
31+
def file_size(self) -> int:
32+
"""Return the total file size of the split."""
33+
34+
def file_paths(self) -> Iterator[str]:
35+
"""Return the paths of all raw files in the split."""

pypaimon/py4j/java_implementation.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from pypaimon.py4j.java_gateway import get_gateway
2525
from pypaimon.py4j.util import java_utils, constants
2626
from pypaimon.api import \
27-
(catalog, table, read_builder, table_scan, split,
27+
(catalog, table, read_builder, table_scan, split, row_type,
2828
table_read, write_builder, table_write, commit_message,
2929
table_commit, Schema, predicate)
3030
from typing import List, Iterator, Optional, Any, TYPE_CHECKING
@@ -115,6 +115,18 @@ def new_read(self) -> 'TableRead':
115115
def new_predicate_builder(self) -> 'PredicateBuilder':
116116
return PredicateBuilder(self._j_row_type)
117117

118+
def read_type(self) -> 'RowType':
119+
return RowType(self._j_read_builder.readType())
120+
121+
122+
class RowType(row_type.RowType):
123+
124+
def __init__(self, j_row_type):
125+
self._j_row_type = j_row_type
126+
127+
def as_arrow(self) -> "pa.Schema":
128+
return java_utils.to_arrow_schema(self._j_row_type)
129+
118130

119131
class TableScan(table_scan.TableScan):
120132

@@ -144,6 +156,23 @@ def __init__(self, j_split):
144156
def to_j_split(self):
145157
return self._j_split
146158

159+
def row_count(self) -> int:
160+
return self._j_split.rowCount()
161+
162+
def file_size(self) -> int:
163+
files_optional = self._j_split.convertToRawFiles()
164+
if not files_optional.isPresent():
165+
return 0
166+
files = files_optional.get()
167+
return sum(file.length() for file in files)
168+
169+
def file_paths(self) -> List[str]:
170+
files_optional = self._j_split.convertToRawFiles()
171+
if not files_optional.isPresent():
172+
return []
173+
files = files_optional.get()
174+
return [file.path() for file in files]
175+
147176

148177
class TableRead(table_read.TableRead):
149178

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
import os
20+
import pyarrow as pa
21+
22+
from pypaimon import Schema
23+
from pypaimon.py4j.tests import PypaimonTestBase
24+
25+
26+
class ObjectInfoTest(PypaimonTestBase):
27+
28+
@classmethod
29+
def setUpClass(cls):
30+
super().setUpClass()
31+
cls.simple_pa_schema = pa.schema([
32+
('f0', pa.int32()),
33+
('f1', pa.string())
34+
])
35+
36+
def test_read_type_metadata(self):
37+
schema = Schema(self.simple_pa_schema)
38+
self.catalog.create_table('default.test_read_type_metadata', schema, False)
39+
table = self.catalog.get_table('default.test_read_type_metadata')
40+
41+
read_builder = table.new_read_builder()
42+
read_builder.with_projection(['f1'])
43+
pa_schema = read_builder.read_type().as_arrow()
44+
45+
self.assertEqual(len(pa_schema.names), 1)
46+
self.assertEqual(pa_schema.names[0], 'f1')
47+
48+
def test_split_metadata(self):
49+
schema = Schema(self.simple_pa_schema)
50+
self.catalog.create_table('default.test_split_metadata', schema, False)
51+
table = self.catalog.get_table('default.test_split_metadata')
52+
53+
write_builder = table.new_batch_write_builder()
54+
table_write = write_builder.new_write()
55+
table_commit = write_builder.new_commit()
56+
data = {
57+
'f0': [1, 2, 3, 4, 5],
58+
'f1': ['a', 'b', 'c', 'd', 'e'],
59+
}
60+
pa_table = pa.Table.from_pydict(data, schema=self.simple_pa_schema)
61+
table_write.write_arrow(pa_table)
62+
table_commit.commit(table_write.prepare_commit())
63+
table_write.close()
64+
table_commit.close()
65+
read_builder = table.new_read_builder()
66+
table_scan = read_builder.new_scan()
67+
splits = table_scan.plan().splits()
68+
69+
self.assertEqual(len(splits), 1)
70+
self.assertEqual(len(splits[0].file_paths()), 1)
71+
self.assertEqual(splits[0].row_count(), 5)
72+
self.assertTrue(splits[0].file_paths()[0].endswith('.parquet'))
73+
self.assertEqual(splits[0].file_size(), os.path.getsize(splits[0].file_paths()[0]))

0 commit comments

Comments
 (0)