@@ -39,7 +39,7 @@ use crate::store::StorageContexts;
3939use crate :: udaf:: PyAggregateUDF ;
4040use crate :: udf:: PyScalarUDF ;
4141use crate :: utils:: { get_tokio_runtime, wait_for_future} ;
42- use datafusion:: arrow:: datatypes:: { DataType , Schema } ;
42+ use datafusion:: arrow:: datatypes:: { DataType , Schema , SchemaRef } ;
4343use datafusion:: arrow:: pyarrow:: PyArrowType ;
4444use datafusion:: arrow:: record_batch:: RecordBatch ;
4545use datafusion:: datasource:: file_format:: file_compression_type:: FileCompressionType ;
@@ -344,9 +344,15 @@ impl PySessionContext {
344344 & mut self ,
345345 partitions : PyArrowType < Vec < Vec < RecordBatch > > > ,
346346 name : Option < & str > ,
347+ schema : Option < PyArrowType < Schema > > ,
347348 py : Python ,
348349 ) -> PyResult < PyDataFrame > {
349- let schema = partitions. 0 [ 0 ] [ 0 ] . schema ( ) ;
350+ let schema = if let Some ( schema) = schema {
351+ SchemaRef :: from ( schema. 0 )
352+ } else {
353+ partitions. 0 [ 0 ] [ 0 ] . schema ( )
354+ } ;
355+
350356 let table = MemTable :: try_new ( schema, partitions. 0 ) . map_err ( DataFusionError :: from) ?;
351357
352358 // generate a random (unique) name for this table if none is provided
@@ -428,12 +434,15 @@ impl PySessionContext {
428434 // Instantiate pyarrow Table object & convert to batches
429435 let table = data. call_method0 ( py, "to_batches" ) ?;
430436
437+ let schema = data. getattr ( py, "schema" ) ?;
438+ let schema = schema. extract :: < PyArrowType < Schema > > ( py) ?;
439+
431440 // Cast PyObject to RecordBatch type
432441 // Because create_dataframe() expects a vector of vectors of record batches
433442 // here we need to wrap the vector of record batches in an additional vector
434443 let batches = table. extract :: < PyArrowType < Vec < RecordBatch > > > ( py) ?;
435444 let list_of_batches = PyArrowType :: from ( vec ! [ batches. 0 ] ) ;
436- self . create_dataframe ( list_of_batches, name, py)
445+ self . create_dataframe ( list_of_batches, name, Some ( schema ) , py)
437446 } )
438447 }
439448
0 commit comments