2020from __future__ import annotations
2121
2222import warnings
23- from typing import TYPE_CHECKING , Any , Protocol , Sequence
23+ from typing import TYPE_CHECKING , Any , Protocol
2424
2525import pyarrow as pa
2626
3131
3232from datafusion .catalog import Catalog , CatalogProvider , Table
3333from datafusion .dataframe import DataFrame
34- from datafusion .expr import SortKey , sort_list_to_raw_sort_list
34+ from datafusion .expr import Expr , SortExpr , sort_list_to_raw_sort_list
3535from datafusion .record_batch import RecordBatchStream
3636from datafusion .user_defined import AggregateUDF , ScalarUDF , TableFunction , WindowUDF
3737
@@ -553,7 +553,7 @@ def register_listing_table(
553553 table_partition_cols : list [tuple [str , str | pa .DataType ]] | None = None ,
554554 file_extension : str = ".parquet" ,
555555 schema : pa .Schema | None = None ,
556- file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
556+ file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
557557 ) -> None :
558558 """Register multiple files as a single table.
559559
@@ -567,20 +567,23 @@ def register_listing_table(
567567 table_partition_cols: Partition columns.
568568 file_extension: File extension of the provided table.
569569 schema: The data source schema.
570- file_sort_order: Sort order for the file. Each sort key can be
571- specified as a column name (``str``), an expression
572- (``Expr``), or a ``SortExpr``.
570+ file_sort_order: Sort order for the file.
573571 """
574572 if table_partition_cols is None :
575573 table_partition_cols = []
576574 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
575+ file_sort_order_raw = (
576+ [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
577+ if file_sort_order is not None
578+ else None
579+ )
577580 self .ctx .register_listing_table (
578581 name ,
579582 str (path ),
580583 table_partition_cols ,
581584 file_extension ,
582585 schema ,
583- self . _convert_file_sort_order ( file_sort_order ) ,
586+ file_sort_order_raw ,
584587 )
585588
586589 def sql (self , query : str , options : SQLOptions | None = None ) -> DataFrame :
@@ -805,7 +808,7 @@ def register_parquet(
805808 file_extension : str = ".parquet" ,
806809 skip_metadata : bool = True ,
807810 schema : pa .Schema | None = None ,
808- file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
811+ file_sort_order : list [ list [ SortExpr ]] | None = None ,
809812 ) -> None :
810813 """Register a Parquet file as a table.
811814
@@ -824,9 +827,7 @@ def register_parquet(
824827 that may be in the file schema. This can help avoid schema
825828 conflicts due to metadata.
826829 schema: The data source schema.
827- file_sort_order: Sort order for the file. Each sort key can be
828- specified as a column name (``str``), an expression
829- (``Expr``), or a ``SortExpr``.
830+ file_sort_order: Sort order for the file.
830831 """
831832 if table_partition_cols is None :
832833 table_partition_cols = []
@@ -839,7 +840,9 @@ def register_parquet(
839840 file_extension ,
840841 skip_metadata ,
841842 schema ,
842- self ._convert_file_sort_order (file_sort_order ),
843+ [sort_list_to_raw_sort_list (exprs ) for exprs in file_sort_order ]
844+ if file_sort_order is not None
845+ else None ,
843846 )
844847
845848 def register_csv (
@@ -1096,7 +1099,7 @@ def read_parquet(
10961099 file_extension : str = ".parquet" ,
10971100 skip_metadata : bool = True ,
10981101 schema : pa .Schema | None = None ,
1099- file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
1102+ file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
11001103 ) -> DataFrame :
11011104 """Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`.
11021105
@@ -1113,17 +1116,19 @@ def read_parquet(
11131116 schema: An optional schema representing the parquet files. If None,
11141117 the parquet reader will try to infer it based on data in the
11151118 file.
1116- file_sort_order: Sort order for the file. Each sort key can be
1117- specified as a column name (``str``), an expression
1118- (``Expr``), or a ``SortExpr``.
1119+ file_sort_order: Sort order for the file.
11191120
11201121 Returns:
11211122 DataFrame representation of the read Parquet files
11221123 """
11231124 if table_partition_cols is None :
11241125 table_partition_cols = []
11251126 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
1126- file_sort_order = self ._convert_file_sort_order (file_sort_order )
1127+ file_sort_order = (
1128+ [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1129+ if file_sort_order is not None
1130+ else None
1131+ )
11271132 return DataFrame (
11281133 self .ctx .read_parquet (
11291134 str (path ),
@@ -1174,24 +1179,6 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
11741179 """Execute the ``plan`` and return the results."""
11751180 return RecordBatchStream (self .ctx .execute (plan ._raw_plan , partitions ))
11761181
1177- @staticmethod
1178- def _convert_file_sort_order (
1179- file_sort_order : Sequence [Sequence [SortKey ]] | None ,
1180- ) -> list [list [Any ]] | None :
1181- """Convert nested ``SortKey`` sequences into raw sort representations.
1182-
1183- Each ``SortKey`` can be a column name string, an ``Expr``, or a
1184- ``SortExpr`` and will be converted using
1185- :func:`datafusion.expr.sort_list_to_raw_sort_list`.
1186- """
1187- # Convert each ``SortKey`` in the provided sort order to the low-level
1188- # representation expected by the Rust bindings.
1189- return (
1190- [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1191- if file_sort_order is not None
1192- else None
1193- )
1194-
11951182 @staticmethod
11961183 def _convert_table_partition_cols (
11971184 table_partition_cols : list [tuple [str , str | pa .DataType ]],
0 commit comments