diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 878ae71c81..cce5250ad5 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple from pyiceberg.conversions import from_bytes -from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary from pyiceberg.partitioning import PartitionSpec from pyiceberg.table.snapshots import Snapshot, ancestors_of from pyiceberg.types import PrimitiveType @@ -523,7 +523,73 @@ def history(self) -> "pa.Table": return pa.Table.from_pylist(history, schema=history_schema) - def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": + def _get_files_from_manifest( + self, manifest_list: ManifestFile, data_file_filter: Optional[Set[DataFileContent]] = None + ) -> "pa.Table": + import pyarrow as pa + + files: list[dict[str, Any]] = [] + schema = self.tbl.metadata.schema() + io = self.tbl.io + + for manifest_entry in manifest_list.fetch_manifest_entry(io): + data_file = manifest_entry.data_file + if data_file_filter and data_file.content not in data_file_filter: + continue + column_sizes = data_file.column_sizes or {} + value_counts = data_file.value_counts or {} + null_value_counts = data_file.null_value_counts or {} + nan_value_counts = data_file.nan_value_counts or {} + lower_bounds = data_file.lower_bounds or {} + upper_bounds = data_file.upper_bounds or {} + readable_metrics = { + schema.find_column_name(field.field_id): { + "column_size": column_sizes.get(field.field_id), + "value_count": value_counts.get(field.field_id), + "null_value_count": null_value_counts.get(field.field_id), + "nan_value_count": nan_value_counts.get(field.field_id), + "lower_bound": from_bytes(field.field_type, lower_bound) + if (lower_bound := lower_bounds.get(field.field_id)) + else None, + "upper_bound": from_bytes(field.field_type, upper_bound) + if (upper_bound := upper_bounds.get(field.field_id)) + else None, + } + for field in self.tbl.metadata.schema().fields + } + partition = data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest_list.partition_spec_id].fields) + } + files.append( + { + "content": data_file.content, + "file_path": data_file.file_path, + "file_format": data_file.file_format, + "spec_id": data_file.spec_id, + "partition": partition_record_dict, + "record_count": data_file.record_count, + "file_size_in_bytes": data_file.file_size_in_bytes, + "column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None, + "value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None, + "null_value_counts": dict(data_file.null_value_counts) if data_file.null_value_counts is not None else None, + "nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None, + "lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None, + "upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None, + "key_metadata": data_file.key_metadata, + "split_offsets": data_file.split_offsets, + "equality_ids": data_file.equality_ids, + "sort_order_id": data_file.sort_order_id, + "readable_metrics": readable_metrics, + } + ) + return pa.Table.from_pylist( + files, + schema=self._get_files_schema(), + ) + + def _get_files_schema(self) -> "pa.Schema": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow @@ -544,6 +610,9 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: ] ) + partition_record = self.tbl.metadata.specs_struct() + pa_record_struct = schema_to_pyarrow(partition_record) + for field in self.tbl.metadata.schema().fields: readable_metrics_struct.append( pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False) @@ -555,6 +624,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("file_path", pa.string(), nullable=False), pa.field("file_format", pa.dictionary(pa.int32(), pa.string()), nullable=False), pa.field("spec_id", pa.int32(), nullable=False), + pa.field("partition", pa_record_struct, nullable=False), pa.field("record_count", pa.int64(), nullable=False), pa.field("file_size_in_bytes", pa.int64(), nullable=False), pa.field("column_sizes", pa.map_(pa.int32(), pa.int64()), nullable=True), @@ -570,71 +640,21 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), ] ) + return files_schema - files: list[dict[str, Any]] = [] + def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": + import pyarrow as pa if not snapshot_id and not self.tbl.metadata.current_snapshot(): - return pa.Table.from_pylist( - files, - schema=files_schema, - ) - snapshot = self._get_snapshot(snapshot_id) + return self._get_files_schema().empty_table() + snapshot = self._get_snapshot(snapshot_id) io = self.tbl.io + files_table: list[pa.Table] = [] for manifest_list in snapshot.manifests(io): - for manifest_entry in manifest_list.fetch_manifest_entry(io): - data_file = manifest_entry.data_file - if data_file_filter and data_file.content not in data_file_filter: - continue - column_sizes = data_file.column_sizes or {} - value_counts = data_file.value_counts or {} - null_value_counts = data_file.null_value_counts or {} - nan_value_counts = data_file.nan_value_counts or {} - lower_bounds = data_file.lower_bounds or {} - upper_bounds = data_file.upper_bounds or {} - readable_metrics = { - schema.find_column_name(field.field_id): { - "column_size": column_sizes.get(field.field_id), - "value_count": value_counts.get(field.field_id), - "null_value_count": null_value_counts.get(field.field_id), - "nan_value_count": nan_value_counts.get(field.field_id), - "lower_bound": from_bytes(field.field_type, lower_bound) - if (lower_bound := lower_bounds.get(field.field_id)) - else None, - "upper_bound": from_bytes(field.field_type, upper_bound) - if (upper_bound := upper_bounds.get(field.field_id)) - else None, - } - for field in self.tbl.metadata.schema().fields - } - files.append( - { - "content": data_file.content, - "file_path": data_file.file_path, - "file_format": data_file.file_format, - "spec_id": data_file.spec_id, - "record_count": data_file.record_count, - "file_size_in_bytes": data_file.file_size_in_bytes, - "column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None, - "value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None, - "null_value_counts": dict(data_file.null_value_counts) - if data_file.null_value_counts is not None - else None, - "nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None, - "lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None, - "upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None, - "key_metadata": data_file.key_metadata, - "split_offsets": data_file.split_offsets, - "equality_ids": data_file.equality_ids, - "sort_order_id": data_file.sort_order_id, - "readable_metrics": readable_metrics, - } - ) + files_table.append(self._get_files_from_manifest(manifest_list, data_file_filter)) - return pa.Table.from_pylist( - files, - schema=files_schema, - ) + return pa.concat_tables(files_table) def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id) @@ -657,3 +677,30 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def _all_files(self, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": + import pyarrow as pa + + snapshots = self.tbl.snapshots() + if not snapshots: + return pa.Table.from_pylist([], schema=self._get_files_schema()) + + executor = ExecutorFactory.get_or_create() + manifest_lists = executor.map(lambda snapshot: snapshot.manifests(self.tbl.io), snapshots) + + unique_manifests = {(manifest.manifest_path, manifest) for manifest_list in manifest_lists for manifest in manifest_list} + + file_lists = executor.map( + lambda args: self._get_files_from_manifest(*args), [(manifest, data_file_filter) for _, manifest in unique_manifests] + ) + + return pa.concat_tables(file_lists) + + def all_files(self) -> "pa.Table": + return self._all_files() + + def all_data_files(self) -> "pa.Table": + return self._all_files({DataFileContent.DATA}) + + def all_delete_files(self) -> "pa.Table": + return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index a2a5fe45bc..e81050a81c 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -71,6 +71,133 @@ def _create_table(session_catalog: Catalog, identifier: str, properties: Propert return session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties) +def _inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None: + from pandas.testing import assert_frame_equal + + assert df.column_names == [ + "content", + "file_path", + "file_format", + "spec_id", + "partition", + "record_count", + "file_size_in_bytes", + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + "key_metadata", + "split_offsets", + "equality_ids", + "sort_order_id", + "readable_metrics", + ] + + # make sure the non-nullable fields are filled + for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]: + for value in df[int_column]: + assert isinstance(value.as_py(), int) + + for split_offsets in df["split_offsets"]: + if split_offsets.as_py() is not None: + assert isinstance(split_offsets.as_py(), list) + + for file_path in df["file_path"]: + assert file_path.as_py().startswith("s3://") + + # sort the dataframes by content and file_path to compare them, + # as the order of the files is not guaranteed in case of all_files + lhs = df.to_pandas().sort_values(by=["content", "file_path"]).reset_index(drop=True) + rhs = spark_df.toPandas().sort_values(by=["content", "file_path"]).reset_index(drop=True) + + lhs_subset = lhs[ + [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "split_offsets", + "equality_ids", + "sort_order_id", + ] + ] + rhs_subset = rhs[ + [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "split_offsets", + "equality_ids", + "sort_order_id", + ] + ] + + assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False) + + for column in df.column_names: + if column == "partition": + # Spark leaves out the partition if the table is unpartitioned + continue + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): + # NaN != NaN in Python + continue + if column in [ + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + ]: + if isinstance(right, dict): + left = dict(left) + assert left == right, f"Difference in column {column}: {left} != {right}" + + elif column == "readable_metrics": + assert list(left.keys()) == [ + "bool", + "string", + "string_long", + "int", + "long", + "float", + "double", + "timestamp", + "timestamptz", + "date", + "binary", + "fixed", + ] + assert left.keys() == right.keys() + + for rm_column in left.keys(): + rm_lhs = left[rm_column] + rm_rhs = right[rm_column] + + assert rm_lhs["column_size"] == rm_rhs["column_size"] + assert rm_lhs["value_count"] == rm_rhs["value_count"] + assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] + assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] + + if rm_column == "timestamptz" and rm_rhs["lower_bound"] and rm_rhs["upper_bound"]: + # PySpark does not correctly set the timstamptz + rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) + rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) + + assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] + assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] + else: + assert left == right, f"Difference in column {column}: {left} != {right}" + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_inspect_snapshots( @@ -665,8 +792,6 @@ def test_inspect_history(spark: SparkSession, session_catalog: Catalog, format_v def test_inspect_files( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: - from pandas.testing import assert_frame_equal - identifier = "default.table_metadata_files" tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) @@ -688,129 +813,9 @@ def test_inspect_files( delete_files_df = tbl.inspect.delete_files() - def inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None: - assert df.column_names == [ - "content", - "file_path", - "file_format", - "spec_id", - "record_count", - "file_size_in_bytes", - "column_sizes", - "value_counts", - "null_value_counts", - "nan_value_counts", - "lower_bounds", - "upper_bounds", - "key_metadata", - "split_offsets", - "equality_ids", - "sort_order_id", - "readable_metrics", - ] - - # make sure the non-nullable fields are filled - for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]: - for value in df[int_column]: - assert isinstance(value.as_py(), int) - - for split_offsets in df["split_offsets"]: - assert isinstance(split_offsets.as_py(), list) - - for file_format in df["file_format"]: - assert file_format.as_py() == "PARQUET" - - for file_path in df["file_path"]: - assert file_path.as_py().startswith("s3://") - - lhs = df.to_pandas() - rhs = spark_df.toPandas() - - lhs_subset = lhs[ - [ - "content", - "file_path", - "file_format", - "spec_id", - "record_count", - "file_size_in_bytes", - "split_offsets", - "equality_ids", - "sort_order_id", - ] - ] - rhs_subset = rhs[ - [ - "content", - "file_path", - "file_format", - "spec_id", - "record_count", - "file_size_in_bytes", - "split_offsets", - "equality_ids", - "sort_order_id", - ] - ] - - assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False) - - for column in df.column_names: - for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): - if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): - # NaN != NaN in Python - continue - if column in [ - "column_sizes", - "value_counts", - "null_value_counts", - "nan_value_counts", - "lower_bounds", - "upper_bounds", - ]: - if isinstance(right, dict): - left = dict(left) - assert left == right, f"Difference in column {column}: {left} != {right}" - - elif column == "readable_metrics": - assert list(left.keys()) == [ - "bool", - "string", - "string_long", - "int", - "long", - "float", - "double", - "timestamp", - "timestamptz", - "date", - "binary", - "fixed", - ] - assert left.keys() == right.keys() - - for rm_column in left.keys(): - rm_lhs = left[rm_column] - rm_rhs = right[rm_column] - - assert rm_lhs["column_size"] == rm_rhs["column_size"] - assert rm_lhs["value_count"] == rm_rhs["value_count"] - assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] - assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] - - if rm_column == "timestamptz" and rm_rhs["lower_bound"] and rm_rhs["upper_bound"]: - # PySpark does not correctly set the timstamptz - rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) - rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) - - assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] - assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] - else: - assert left == right, f"Difference in column {column}: {left} != {right}" - - inspect_files_asserts(files_df, spark.table(f"{identifier}.files")) - inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files")) - inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files")) + _inspect_files_asserts(files_df, spark.table(f"{identifier}.files")) + _inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files")) + _inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files")) @pytest.mark.integration @@ -823,6 +828,9 @@ def test_inspect_files_no_snapshot(spark: SparkSession, session_catalog: Catalog files_df = tbl.refresh().inspect.files() data_files_df = tbl.inspect.data_files() delete_files_df = tbl.inspect.delete_files() + all_files_df = tbl.inspect.all_files() + all_data_files_df = tbl.inspect.all_data_files() + all_delete_files_df = tbl.inspect.all_delete_files() def inspect_files_asserts(df: pa.Table) -> None: assert df.column_names == [ @@ -830,6 +838,7 @@ def inspect_files_asserts(df: pa.Table) -> None: "file_path", "file_format", "spec_id", + "partition", "record_count", "file_size_in_bytes", "column_sizes", @@ -850,6 +859,9 @@ def inspect_files_asserts(df: pa.Table) -> None: inspect_files_asserts(files_df) inspect_files_asserts(data_files_df) inspect_files_asserts(delete_files_df) + inspect_files_asserts(all_files_df) + inspect_files_asserts(all_data_files_df) + inspect_files_asserts(all_delete_files_df) @pytest.mark.integration @@ -942,3 +954,150 @@ def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, fo lhs = spark.table(f"{identifier}.all_manifests").toPandas() rhs = df.to_pandas() assert_frame_equal(lhs, rhs, check_dtype=False) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_all_files( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.table_metadata_files" + + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + # append three times + for _ in range(3): + tbl.append(arrow_table_with_null) + + # configure table properties + if format_version == 2: + with tbl.transaction() as txn: + txn.set_properties({"write.delete.mode": "merge-on-read"}) + txn.set_properties({"write.update.mode": "merge-on-read"}) + spark.sql(f"DELETE FROM {identifier} WHERE int = 1") + tbl.refresh() + tbl.append(arrow_table_with_null) + spark.sql(f"UPDATE {identifier} SET string = 'b' WHERE int = 9") + spark.sql(f"DELETE FROM {identifier} WHERE int = 1") + tbl.refresh() + + all_files_df = tbl.inspect.all_files() + all_data_files_df = tbl.inspect.all_data_files() + all_delete_files_df = tbl.inspect.all_delete_files() + + _inspect_files_asserts(all_files_df, spark.table(f"{identifier}.all_files")) + _inspect_files_asserts(all_data_files_df, spark.table(f"{identifier}.all_data_files")) + _inspect_files_asserts(all_delete_files_df, spark.table(f"{identifier}.all_delete_files")) + + +@pytest.mark.integration +def test_inspect_files_format_version_3(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.table_metadata_files" + + tbl = _create_table( + session_catalog, + identifier, + properties={ + "format-version": "3", + "write.delete.mode": "merge-on-read", + "write.update.mode": "merge-on-read", + "write.merge.mode": "merge-on-read", + }, + ) + + insert_data_sql = f"""INSERT INTO {identifier} VALUES + (false, 'a', 'aaaaaaaaaaaaaaaaaaaaaa', 1, 1, 0.0, 0.0, TIMESTAMP('2023-01-01 19:25:00'), TIMESTAMP('2023-01-01 19:25:00+00:00'), DATE('2023-01-01'), X'01', X'00000000000000000000000000000000'), + (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL), + (true, 'z', 'zzzzzzzzzzzzzzzzzzzzzz', 9, 9, 0.9, 0.9, TIMESTAMP('2023-03-01 19:25:00'), TIMESTAMP('2023-03-01 19:25:00+00:00'), DATE('2023-03-01'), X'12', X'11111111111111111111111111111111'); + """ + + spark.sql(insert_data_sql) + spark.sql(insert_data_sql) + spark.sql(f"UPDATE {identifier} SET int = 2 WHERE int = 1") + spark.sql(f"DELETE FROM {identifier} WHERE int = 9") + + tbl.refresh() + + files_df = tbl.inspect.files() + data_files_df = tbl.inspect.data_files() + delete_files_df = tbl.inspect.delete_files() + + all_files_df = tbl.inspect.all_files() + all_data_files_df = tbl.inspect.all_data_files() + all_delete_files_df = tbl.inspect.all_delete_files() + + _inspect_files_asserts(files_df, spark.table(f"{identifier}.files")) + _inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files")) + _inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files")) + + _inspect_files_asserts(all_files_df, spark.table(f"{identifier}.all_files")) + _inspect_files_asserts(all_data_files_df, spark.table(f"{identifier}.all_data_files")) + _inspect_files_asserts(all_delete_files_df, spark.table(f"{identifier}.all_delete_files")) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2, 3]) +def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + from pandas.testing import assert_frame_equal + + identifier = "default.table_metadata_files_partitioned" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + dt date, + int_data int + ) + PARTITIONED BY (months(dt)) + TBLPROPERTIES ('format-version'='{format_version}') + """ + ) + + if format_version > 1: + spark.sql( + f""" + ALTER TABLE {identifier} SET TBLPROPERTIES( + 'write.update.mode' = 'merge-on-read', + 'write.delete.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read') + """ + ) + + spark.sql(f""" + INSERT INTO {identifier} VALUES (CAST('2025-01-01' AS date), 1), (CAST('2025-01-01' AS date), 2) + """) + + spark.sql( + f""" + ALTER TABLE {identifier} + REPLACE PARTITION FIELD dt_month WITH days(dt) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES (CAST('2025-01-02' AS date), 2) + """ + ) + + spark.sql( + f""" + DELETE FROM {identifier} WHERE int_data = 1 + """ + ) + + tbl = session_catalog.load_table(identifier) + files_df = tbl.inspect.files() + lhs = files_df.to_pandas()[["file_path", "partition"]].sort_values("file_path", ignore_index=True).reset_index() + rhs = ( + spark.table(f"{identifier}.files") + .select(["file_path", "partition"]) + .toPandas() + .sort_values("file_path", ignore_index=True) + .reset_index() + ) + assert_frame_equal(lhs, rhs, check_dtype=False)