diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index c8620af732..15931d02fb 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -370,7 +370,165 @@ manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap- summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]] ``` -### Add Files +### Entries + +To show all the table's current manifest entries for both data and delete files. + +```python +table.inspect.entries() +``` + +``` +pyarrow.Table +status: int8 not null +snapshot_id: int64 not null +sequence_number: int64 not null +file_sequence_number: int64 not null +data_file: struct not null, record_count: int64 not null, file_size_in_bytes: int64 not null, column_sizes: map, value_counts: map, null_value_counts: map, nan_value_counts: map, lower_bounds: map, upper_bounds: map, key_metadata: binary, split_offsets: list, equality_ids: list, sort_order_id: int32> not null + child 0, content: int8 not null + child 1, file_path: string not null + child 2, file_format: string not null + child 3, partition: struct<> not null + child 4, record_count: int64 not null + child 5, file_size_in_bytes: int64 not null + child 6, column_sizes: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 7, value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 8, null_value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 9, nan_value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 + child 10, lower_bounds: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: binary + child 11, upper_bounds: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: binary + child 12, key_metadata: binary + child 13, split_offsets: list + child 0, item: int64 + child 14, equality_ids: list + child 0, item: int32 + child 15, sort_order_id: int32 +readable_metrics: struct not null, lat: struct not null, long: struct not null> + child 0, city: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: string + child 5, upper_bound: string + child 1, lat: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: double + child 5, upper_bound: double + child 2, long: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: double + child 5, upper_bound: double +---- +status: [[1]] +snapshot_id: [[6245626162224016531]] +sequence_number: [[1]] +file_sequence_number: [[1]] +data_file: [ + -- is_valid: all not null + -- child 0 type: int8 +[0] + -- child 1 type: string +["s3://warehouse/default/cities/data/00000-0-80766b66-e558-4150-a5cf-85e4c609b9fe.parquet"] + -- child 2 type: string +["PARQUET"] + -- child 3 type: struct<> + -- is_valid: all not null + -- child 4 type: int64 +[4] + -- child 5 type: int64 +[1656] + -- child 6 type: map +[keys:[1,2,3]values:[140,135,135]] + -- child 7 type: map +[keys:[1,2,3]values:[4,4,4]] + -- child 8 type: map +[keys:[1,2,3]values:[0,0,0]] + -- child 9 type: map +[keys:[]values:[]] + -- child 10 type: map +[keys:[1,2,3]values:[416D7374657264616D,8602B68311E34240,3A77BB5E9A9B5EC0]] + -- child 11 type: map +[keys:[1,2,3]values:[53616E204672616E636973636F,F5BEF1B5678E4A40,304CA60A46651840]] + -- child 12 type: binary +[null] + -- child 13 type: list +[[4]] + -- child 14 type: list +[null] + -- child 15 type: int32 +[null]] +readable_metrics: [ + -- is_valid: all not null + -- child 0 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[140] + -- child 1 type: int64 +[4] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: string +["Amsterdam"] + -- child 5 type: string +["San Francisco"] + -- child 1 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[135] + -- child 1 type: int64 +[4] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: double +[37.773972] + -- child 5 type: double +[53.11254] + -- child 2 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[135] + -- child 1 type: int64 +[4] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: double +[-122.431297] + -- child 5 type: double +[6.0989]] +``` + +## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0f113f3b5e..e183d82775 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -47,6 +47,7 @@ from typing_extensions import Annotated import pyiceberg.expressions.parser as parser +from pyiceberg.conversions import from_bytes from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError from pyiceberg.expressions import ( AlwaysTrue, @@ -3264,3 +3265,126 @@ def snapshots(self) -> "pa.Table": snapshots, schema=snapshots_schema, ) + + def entries(self) -> "pa.Table": + import pyarrow as pa + + from pyiceberg.io.pyarrow import schema_to_pyarrow + + schema = self.tbl.metadata.schema() + + readable_metrics_struct = [] + + def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: + pa_bound_type = schema_to_pyarrow(bound_type) + return pa.struct([ + pa.field("column_size", pa.int64(), nullable=True), + pa.field("value_count", pa.int64(), nullable=True), + pa.field("null_value_count", pa.int64(), nullable=True), + pa.field("nan_value_count", pa.int64(), nullable=True), + pa.field("lower_bound", pa_bound_type, nullable=True), + pa.field("upper_bound", pa_bound_type, nullable=True), + ]) + + for field in self.tbl.metadata.schema().fields: + readable_metrics_struct.append( + pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False) + ) + + partition_record = self.tbl.metadata.specs_struct() + pa_record_struct = schema_to_pyarrow(partition_record) + + entries_schema = pa.schema([ + pa.field('status', pa.int8(), nullable=False), + pa.field('snapshot_id', pa.int64(), nullable=False), + pa.field('sequence_number', pa.int64(), nullable=False), + pa.field('file_sequence_number', pa.int64(), nullable=False), + pa.field( + 'data_file', + pa.struct([ + pa.field('content', pa.int8(), nullable=False), + pa.field('file_path', pa.string(), nullable=False), + pa.field('file_format', pa.string(), nullable=False), + pa.field('partition', pa_record_struct, nullable=False), + pa.field('record_count', pa.int64(), nullable=False), + pa.field('file_size_in_bytes', pa.int64(), nullable=False), + pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field('value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field('null_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field('nan_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field('lower_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True), + pa.field('upper_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True), + pa.field('key_metadata', pa.binary(), nullable=True), + pa.field('split_offsets', pa.list_(pa.int64()), nullable=True), + pa.field('equality_ids', pa.list_(pa.int32()), nullable=True), + pa.field('sort_order_id', pa.int32(), nullable=True), + ]), + nullable=False, + ), + pa.field('readable_metrics', pa.struct(readable_metrics_struct), nullable=True), + ]) + + entries = [] + if snapshot := self.tbl.metadata.current_snapshot(): + for manifest in snapshot.manifests(self.tbl.io): + for entry in manifest.fetch_manifest_entry(io=self.tbl.io): + column_sizes = entry.data_file.column_sizes or {} + value_counts = entry.data_file.value_counts or {} + null_value_counts = entry.data_file.null_value_counts or {} + nan_value_counts = entry.data_file.nan_value_counts or {} + lower_bounds = entry.data_file.lower_bounds or {} + upper_bounds = entry.data_file.upper_bounds or {} + readable_metrics = { + schema.find_column_name(field.field_id): { + "column_size": column_sizes.get(field.field_id), + "value_count": value_counts.get(field.field_id), + "null_value_count": null_value_counts.get(field.field_id), + "nan_value_count": nan_value_counts.get(field.field_id), + # Makes them readable + "lower_bound": from_bytes(field.field_type, lower_bound) + if (lower_bound := lower_bounds.get(field.field_id)) + else None, + "upper_bound": from_bytes(field.field_type, upper_bound) + if (upper_bound := upper_bounds.get(field.field_id)) + else None, + } + for field in self.tbl.metadata.schema().fields + } + + partition = entry.data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + } + + entries.append({ + 'status': entry.status.value, + 'snapshot_id': entry.snapshot_id, + 'sequence_number': entry.data_sequence_number, + 'file_sequence_number': entry.file_sequence_number, + 'data_file': { + "content": entry.data_file.content, + "file_path": entry.data_file.file_path, + "file_format": entry.data_file.file_format, + "partition": partition_record_dict, + "record_count": entry.data_file.record_count, + "file_size_in_bytes": entry.data_file.file_size_in_bytes, + "column_sizes": dict(entry.data_file.column_sizes), + "value_counts": dict(entry.data_file.value_counts), + "null_value_counts": dict(entry.data_file.null_value_counts), + "nan_value_counts": entry.data_file.nan_value_counts, + "lower_bounds": entry.data_file.lower_bounds, + "upper_bounds": entry.data_file.upper_bounds, + "key_metadata": entry.data_file.key_metadata, + "split_offsets": entry.data_file.split_offsets, + "equality_ids": entry.data_file.equality_ids, + "sort_order_id": entry.data_file.sort_order_id, + "spec_id": entry.data_file.spec_id, + }, + 'readable_metrics': readable_metrics, + }) + + return pa.Table.from_pylist( + entries, + schema=entries_schema, + ) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 2e20c5092f..21ed144784 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -49,7 +49,7 @@ IcebergRootModel, Properties, ) -from pyiceberg.types import transform_dict_value_to_str +from pyiceberg.types import NestedField, StructType, transform_dict_value_to_str from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import datetime_to_millis @@ -245,6 +245,31 @@ def specs(self) -> Dict[int, PartitionSpec]: """Return a dict the partition specs this table.""" return {spec.spec_id: spec for spec in self.partition_specs} + def specs_struct(self) -> StructType: + """Produce a struct of all the combined PartitionSpecs. + + The partition fields should be optional: Partition fields may be added later, + in which case not all files would have the result field, and it may be null. + + :return: A StructType that represents all the combined PartitionSpecs of the table + """ + specs = self.specs() + + # Collect all the fields + struct_fields = {field.field_id: field for spec in specs.values() for field in spec.fields} + + schema = self.schema() + + nested_fields = [] + # Sort them by field_id in order to get a deterministic output + for field_id in sorted(struct_fields): + field = struct_fields[field_id] + source_type = schema.find_type(field.source_id) + result_type = field.transform.result_type(source_type) + nested_fields.append(NestedField(field_id=field.field_id, name=field.name, type=result_type, required=False)) + + return StructType(*nested_fields) + def new_snapshot_id(self) -> int: """Generate a new snapshot-id that's not in use.""" snapshot_id = _generate_snapshot_id() diff --git a/pyiceberg/utils/lazydict.py b/pyiceberg/utils/lazydict.py index dfe251c0a7..ea70c78c7a 100644 --- a/pyiceberg/utils/lazydict.py +++ b/pyiceberg/utils/lazydict.py @@ -66,3 +66,7 @@ def __len__(self) -> int: """Return the number of items in the dictionary.""" source = self._dict or self._build_dict() return len(source) + + def __dict__(self) -> Dict[K, V]: # type: ignore + """Convert the lazy dict in a dict.""" + return self._dict or self._build_dict() diff --git a/tests/conftest.py b/tests/conftest.py index 8c381a1c4c..aa09517b6a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2043,5 +2043,5 @@ def pa_schema() -> "pa.Schema": def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table": import pyarrow as pa - """PyArrow table with all kinds of columns""" + """Pyarrow table with all kinds of columns.""" return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py new file mode 100644 index 0000000000..f2515caee8 --- /dev/null +++ b/tests/integration/test_inspect_table.py @@ -0,0 +1,268 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name + +import math +from datetime import date, datetime + +import pyarrow as pa +import pytest +import pytz +from pyspark.sql import SparkSession + +from pyiceberg.catalog import Catalog +from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from pyiceberg.typedef import Properties +from pyiceberg.types import ( + BinaryType, + BooleanType, + DateType, + DoubleType, + FixedType, + FloatType, + IntegerType, + LongType, + NestedField, + StringType, + TimestampType, + TimestamptzType, +) + +TABLE_SCHEMA = Schema( + NestedField(field_id=1, name="bool", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="string", field_type=StringType(), required=False), + NestedField(field_id=3, name="string_long", field_type=StringType(), required=False), + NestedField(field_id=4, name="int", field_type=IntegerType(), required=False), + NestedField(field_id=5, name="long", field_type=LongType(), required=False), + NestedField(field_id=6, name="float", field_type=FloatType(), required=False), + NestedField(field_id=7, name="double", field_type=DoubleType(), required=False), + NestedField(field_id=8, name="timestamp", field_type=TimestampType(), required=False), + NestedField(field_id=9, name="timestamptz", field_type=TimestamptzType(), required=False), + NestedField(field_id=10, name="date", field_type=DateType(), required=False), + # NestedField(field_id=11, name="time", field_type=TimeType(), required=False), + # NestedField(field_id=12, name="uuid", field_type=UuidType(), required=False), + NestedField(field_id=12, name="binary", field_type=BinaryType(), required=False), + NestedField(field_id=13, name="fixed", field_type=FixedType(16), required=False), +) + + +def _create_table(session_catalog: Catalog, identifier: str, properties: Properties) -> Table: + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + return session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_snapshots( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.table_metadata_snapshots" + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + tbl.overwrite(arrow_table_with_null) + # should produce a DELETE entry + tbl.overwrite(arrow_table_with_null) + # Since we don't rewrite, this should produce a new manifest with an ADDED entry + tbl.append(arrow_table_with_null) + + df = tbl.inspect.snapshots() + + assert df.column_names == [ + 'committed_at', + 'snapshot_id', + 'parent_id', + 'operation', + 'manifest_list', + 'summary', + ] + + for committed_at in df['committed_at']: + assert isinstance(committed_at.as_py(), datetime) + + for snapshot_id in df['snapshot_id']: + assert isinstance(snapshot_id.as_py(), int) + + assert df['parent_id'][0].as_py() is None + assert df['parent_id'][1:] == df['snapshot_id'][:2] + + assert [operation.as_py() for operation in df['operation']] == ['append', 'overwrite', 'append'] + + for manifest_list in df['manifest_list']: + assert manifest_list.as_py().startswith("s3://") + + assert df['summary'][0].as_py() == [ + ('added-files-size', '5459'), + ('added-data-files', '1'), + ('added-records', '3'), + ('total-data-files', '1'), + ('total-delete-files', '0'), + ('total-records', '3'), + ('total-files-size', '5459'), + ('total-position-deletes', '0'), + ('total-equality-deletes', '0'), + ] + + lhs = spark.table(f"{identifier}.snapshots").toPandas() + rhs = df.to_pandas() + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if column == 'summary': + # Arrow returns a list of tuples, instead of a dict + right = dict(right) + + if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): + # NaN != NaN in Python + continue + + assert left == right, f"Difference in column {column}: {left} != {right}" + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_entries( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.table_metadata_entries" + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + # Write some data + tbl.append(arrow_table_with_null) + + df = tbl.inspect.entries() + + assert df.column_names == [ + 'status', + 'snapshot_id', + 'sequence_number', + 'file_sequence_number', + 'data_file', + 'readable_metrics', + ] + + # Make sure that they are filled properly + for int_column in ['status', 'snapshot_id', 'sequence_number', 'file_sequence_number']: + for value in df[int_column]: + assert isinstance(value.as_py(), int) + + for snapshot_id in df['snapshot_id']: + assert isinstance(snapshot_id.as_py(), int) + + lhs = df.to_pandas() + rhs = spark.table(f"{identifier}.entries").toPandas() + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if column == 'data_file': + right = right.asDict(recursive=True) + for df_column in left.keys(): + if df_column == 'partition': + # Spark leaves out the partition if the table is unpartitioned + continue + + df_lhs = left[df_column] + df_rhs = right[df_column] + if isinstance(df_rhs, dict): + # Arrow turns dicts into lists of tuple + df_lhs = dict(df_lhs) + + assert df_lhs == df_rhs, f"Difference in data_file column {df_column}: {df_lhs} != {df_rhs}" + elif column == 'readable_metrics': + right = right.asDict(recursive=True) + + assert list(left.keys()) == [ + 'bool', + 'string', + 'string_long', + 'int', + 'long', + 'float', + 'double', + 'timestamp', + 'timestamptz', + 'date', + 'binary', + 'fixed', + ] + + assert left.keys() == right.keys() + + for rm_column in left.keys(): + rm_lhs = left[rm_column] + rm_rhs = right[rm_column] + + assert rm_lhs['column_size'] == rm_rhs['column_size'] + assert rm_lhs['value_count'] == rm_rhs['value_count'] + assert rm_lhs['null_value_count'] == rm_rhs['null_value_count'] + assert rm_lhs['nan_value_count'] == rm_rhs['nan_value_count'] + + if rm_column == 'timestamptz': + # PySpark does not correctly set the timstamptz + rm_rhs['lower_bound'] = rm_rhs['lower_bound'].replace(tzinfo=pytz.utc) + rm_rhs['upper_bound'] = rm_rhs['upper_bound'].replace(tzinfo=pytz.utc) + + assert rm_lhs['lower_bound'] == rm_rhs['lower_bound'] + assert rm_lhs['upper_bound'] == rm_rhs['upper_bound'] + else: + assert left == right, f"Difference in column {column}: {left} != {right}" + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_entries_partitioned(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_metadata_entries_partitioned" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + dt date + ) + PARTITIONED BY (months(dt)) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES (CAST('2021-01-01' AS date)) + """ + ) + + spark.sql( + f""" + ALTER TABLE {identifier} + REPLACE PARTITION FIELD dt_month WITH days(dt) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES (CAST('2021-02-01' AS date)) + """ + ) + + df = session_catalog.load_table(identifier).inspect.entries() + + assert df.to_pydict()['data_file'][0]['partition'] == {'dt_day': date(2021, 2, 1), 'dt_month': None} + assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None, 'dt_month': 612} diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 7756702368..e950fb43b1 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -96,34 +96,6 @@ ) -@pytest.fixture(scope="session") -def pa_schema() -> pa.Schema: - return pa.schema([ - ("bool", pa.bool_()), - ("string", pa.string()), - ("string_long", pa.string()), - ("int", pa.int32()), - ("long", pa.int64()), - ("float", pa.float32()), - ("double", pa.float64()), - ("timestamp", pa.timestamp(unit="us")), - ("timestamptz", pa.timestamp(unit="us", tz="UTC")), - ("date", pa.date32()), - # Not supported by Spark - # ("time", pa.time64("us")), - # Not natively supported by Arrow - # ("uuid", pa.fixed(16)), - ("binary", pa.large_binary()), - ("fixed", pa.binary(16)), - ]) - - -@pytest.fixture(scope="session") -def arrow_table_with_null(pa_schema: pa.Schema) -> pa.Table: - """PyArrow table with all kinds of columns""" - return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema) - - @pytest.fixture(scope="session") def arrow_table_without_data(pa_schema: pa.Schema) -> pa.Table: """PyArrow table with all kinds of columns"""