@@ -920,7 +920,7 @@ fn record_batch_into_schema(
920920) -> Result < RecordBatch , ArrowError > {
921921 let schema = Arc :: new ( schema. clone ( ) ) ;
922922 let base_schema = record_batch. schema ( ) ;
923- if ( base_schema. fields ( ) . len ( ) == 0 ) {
923+ if base_schema. fields ( ) . len ( ) == 0 {
924924 // Nothing to project
925925 return Ok ( RecordBatch :: new_empty ( schema) ) ;
926926 }
@@ -980,160 +980,60 @@ async fn collect_record_batches_to_display(
980980 let mut record_batches = Vec :: default ( ) ;
981981 let mut has_more = false ;
982982
983- println ! (
984- "==> Starting loop with min_rows: {}, max_rows: {}, max_table_bytes: {}" ,
985- min_rows, max_rows, config. max_table_bytes
986- ) ;
987-
988983 while ( size_estimate_so_far < config. max_table_bytes && rows_so_far < max_rows)
989984 || rows_so_far < min_rows
990985 {
991- println ! (
992- "==> Loop condition: size_estimate_so_far ({}) < max_table_bytes ({})? {}" ,
993- size_estimate_so_far,
994- config. max_table_bytes,
995- size_estimate_so_far < config. max_table_bytes
996- ) ;
997- println ! (
998- "==> Loop condition: rows_so_far ({}) < max_rows ({})? {}" ,
999- rows_so_far,
1000- max_rows,
1001- rows_so_far < max_rows
1002- ) ;
1003- println ! (
1004- "==> Loop condition: rows_so_far ({}) < min_rows ({})? {}" ,
1005- rows_so_far,
1006- min_rows,
1007- rows_so_far < min_rows
1008- ) ;
1009-
1010986 let mut rb = match stream. next ( ) . await {
1011987 None => {
1012- println ! ( "==> Exiting loop: stream.next() returned None (no more data)" ) ;
1013988 break ;
1014989 }
1015990 Some ( Ok ( r) ) => r,
1016991 Some ( Err ( e) ) => return Err ( e) ,
1017992 } ;
1018993
1019994 let mut rows_in_rb = rb. num_rows ( ) ;
1020- println ! ( "==> Received batch with {} rows" , rows_in_rb) ;
1021-
1022995 if rows_in_rb > 0 {
1023996 size_estimate_so_far += rb. get_array_memory_size ( ) ;
1024- println ! ( "==> New size_estimate_so_far: {}" , size_estimate_so_far) ;
1025997
1026998 if size_estimate_so_far > config. max_table_bytes {
1027- println ! (
1028- "==> Size limit reached: {} > {}" ,
1029- size_estimate_so_far, config. max_table_bytes
1030- ) ;
1031999 let ratio = config. max_table_bytes as f32 / size_estimate_so_far as f32 ;
10321000 let total_rows = rows_in_rb + rows_so_far;
10331001
10341002 let mut reduced_row_num = ( total_rows as f32 * ratio) . round ( ) as usize ;
10351003 if reduced_row_num < min_rows {
10361004 reduced_row_num = min_rows. min ( total_rows) ;
1037- println ! (
1038- "==> Adjusted reduced_row_num to {} to meet min_rows" ,
1039- reduced_row_num
1040- ) ;
10411005 }
10421006
10431007 let limited_rows_this_rb = reduced_row_num - rows_so_far;
1044- println ! (
1045- "==> Limiting to {} rows in this batch (reduced_row_num: {}, rows_so_far: {})" ,
1046- limited_rows_this_rb, reduced_row_num, rows_so_far
1047- ) ;
1048-
10491008 if limited_rows_this_rb < rows_in_rb {
10501009 rows_in_rb = limited_rows_this_rb;
10511010 rb = rb. slice ( 0 , limited_rows_this_rb) ;
10521011 has_more = true ;
1053- println ! ( "==> Sliced batch to {} rows" , limited_rows_this_rb) ;
10541012 }
10551013 }
10561014
10571015 if rows_in_rb + rows_so_far > max_rows {
1058- println ! (
1059- "==> Row limit reached: {} + {} > {}" ,
1060- rows_in_rb, rows_so_far, max_rows
1061- ) ;
10621016 rb = rb. slice ( 0 , max_rows - rows_so_far) ;
10631017 has_more = true ;
1064- println ! (
1065- "==> Sliced batch to {} rows to meet max_rows" ,
1066- max_rows - rows_so_far
1067- ) ;
10681018 }
10691019
10701020 rows_so_far += rb. num_rows ( ) ;
10711021 record_batches. push ( rb) ;
1072- println ! (
1073- "==> Added batch: size_estimate_so_far: {}, rows_so_far: {}" ,
1074- size_estimate_so_far, rows_so_far
1075- ) ;
1076- } else {
1077- println ! ( "==> Skipping empty batch" ) ;
10781022 }
10791023 }
10801024
1081- println ! ( "==> Exited while loop: size_estimate_so_far: {}, rows_so_far: {}, min_rows: {}, max_rows: {}" ,
1082- size_estimate_so_far, rows_so_far, min_rows, max_rows) ;
1083- println ! ( "==> Loop condition evaluation at exit:" ) ;
1084- println ! (
1085- "==> size_estimate_so_far < config.max_table_bytes: {} < {} = {}" ,
1086- size_estimate_so_far,
1087- config. max_table_bytes,
1088- size_estimate_so_far < config. max_table_bytes
1089- ) ;
1090- println ! (
1091- "==> rows_so_far < max_rows: {} < {} = {}" ,
1092- rows_so_far,
1093- max_rows,
1094- rows_so_far < max_rows
1095- ) ;
1096- println ! (
1097- "==> rows_so_far < min_rows: {} < {} = {}" ,
1098- rows_so_far,
1099- min_rows,
1100- rows_so_far < min_rows
1101- ) ;
1102- println ! (
1103- "==> Combined condition: {} || {} = {}" ,
1104- ( size_estimate_so_far < config. max_table_bytes && rows_so_far < max_rows) ,
1105- rows_so_far < min_rows,
1106- ( size_estimate_so_far < config. max_table_bytes && rows_so_far < max_rows)
1107- || rows_so_far < min_rows
1108- ) ;
1109-
11101025 if record_batches. is_empty ( ) {
1111- println ! ( "==> No record batches collected" ) ;
11121026 return Ok ( ( Vec :: default ( ) , false ) ) ;
11131027 }
11141028
11151029 if !has_more {
11161030 // Data was not already truncated, so check to see if more record batches remain
11171031 has_more = match stream. try_next ( ) . await {
1118- Ok ( None ) => {
1119- println ! ( "==> No more record batches in stream" ) ;
1120- false
1121- } // reached end
1122- Ok ( Some ( _) ) => {
1123- println ! ( "==> More record batches available in stream" ) ;
1124- true
1125- }
1126- Err ( _) => {
1127- println ! ( "==> Stream error or disconnected" ) ;
1128- false
1129- } // Stream disconnected
1032+ Ok ( None ) => false , // reached end
1033+ Ok ( Some ( _) ) => true ,
1034+ Err ( _) => false , // Stream disconnected
11301035 } ;
11311036 }
11321037
1133- println ! (
1134- "==> Returning {} record batches, has_more: {}" ,
1135- record_batches. len( ) ,
1136- has_more
1137- ) ;
11381038 Ok ( ( record_batches, has_more) )
11391039}
0 commit comments