@@ -4,7 +4,7 @@ use std::collections::HashMap;
44use std:: error:: Error ;
55use std:: sync:: Arc ;
66
7- use futures:: future:: { self , FutureExt } ;
7+ use futures:: future;
88use futures:: stream:: StreamExt ;
99use maplit:: hashmap;
1010use rdkafka_sys:: RDKafkaErrorCode ;
@@ -491,7 +491,7 @@ async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
491491 Ok ( ( ) )
492492}
493493
494- #[ tokio:: test]
494+ #[ tokio:: test( flavor = "multi_thread" ) ]
495495async fn test_consume_partition_order ( ) {
496496 let _r = env_logger:: try_init ( ) ;
497497
@@ -545,8 +545,8 @@ async fn test_consume_partition_order() {
545545 let partition1 = consumer. split_partition_queue ( & topic_name, 1 ) . unwrap ( ) ;
546546
547547 let mut i = 0 ;
548- while i < 12 {
549- if let Some ( m) = consumer. recv ( ) . now_or_never ( ) {
548+ while i < 5 {
549+ if let Ok ( m) = time :: timeout ( Duration :: from_millis ( 1000 ) , consumer. recv ( ) ) . await {
550550 // retry on transient errors until we get a message
551551 let m = match m {
552552 Err ( KafkaError :: MessageConsumption (
@@ -564,9 +564,11 @@ async fn test_consume_partition_order() {
564564 let partition: i32 = m. partition ( ) ;
565565 assert ! ( partition == 0 || partition == 2 ) ;
566566 i += 1 ;
567+ } else {
568+ panic ! ( "Timeout receiving message" ) ;
567569 }
568570
569- if let Some ( m) = partition1. recv ( ) . now_or_never ( ) {
571+ if let Ok ( m) = time :: timeout ( Duration :: from_millis ( 1000 ) , partition1. recv ( ) ) . await {
570572 // retry on transient errors until we get a message
571573 let m = match m {
572574 Err ( KafkaError :: MessageConsumption (
@@ -583,6 +585,8 @@ async fn test_consume_partition_order() {
583585 } ;
584586 assert_eq ! ( m. partition( ) , 1 ) ;
585587 i += 1 ;
588+ } else {
589+ panic ! ( "Timeout receiving message" ) ;
586590 }
587591 }
588592 }
0 commit comments