@@ -254,6 +254,8 @@ async fn agg_grouped_topk_yields(
254254
255255#[ rstest]
256256#[ tokio:: test]
257+ // A test that mocks the behavior of `SpillManager::read_spill_as_stream` without file access
258+ // to verify that a cooperative stream would properly yields in a spill file read scenario
257259async fn spill_reader_stream_yield ( ) -> Result < ( ) , Box < dyn Error > > {
258260 use arrow:: compute:: concat_batches;
259261 use datafusion_physical_plan:: common:: spawn_buffered;
@@ -272,8 +274,9 @@ async fn spill_reader_stream_yield() -> Result<(), Box<dyn Error>> {
272274
273275 let consumer_stream = futures:: stream:: poll_fn ( move |cx| {
274276 let mut collected = vec ! [ ] ;
275- // To make sure that inner stream is polled multiple times, loop forever if inner (producer) stream returns Ready
276- loop {
277+ // To make sure that inner stream is polled multiple times, loop until the buffer is full
278+ // Ideally, the stream will yield before the loop ends
279+ for _ in 0 ..buffer_capacity {
277280 match mock_stream. as_mut ( ) . poll_next ( cx) {
278281 Poll :: Ready ( Some ( Ok ( batch) ) ) => {
279282 collected. push ( batch) ;
@@ -293,6 +296,7 @@ async fn spill_reader_stream_yield() -> Result<(), Box<dyn Error>> {
293296 }
294297
295298 // This should be unreachable since the stream is canceled
299+ assert ! ( false , "This should be unreachable if cooperative stream properly yields" ) ;
296300 let combined = concat_batches ( & mock_stream. schema ( ) , & collected)
297301 . expect ( "Failed to concat batches" ) ;
298302
0 commit comments