@@ -4,7 +4,7 @@ use std::collections::HashMap;
44use std:: error:: Error ;
55use std:: sync:: Arc ;
66
7- use futures:: future:: { self , FutureExt } ;
7+ use futures:: future;
88use futures:: stream:: StreamExt ;
99use maplit:: hashmap;
1010use rdkafka_sys:: RDKafkaErrorCode ;
@@ -70,7 +70,7 @@ async fn test_produce_consume_base() {
7070 let _r = env_logger:: try_init ( ) ;
7171
7272 let start_time = current_time_millis ( ) ;
73- let topic_name = rand_test_topic ( ) ;
73+ let topic_name = rand_test_topic ( "test_produce_consume_base" ) ;
7474 let message_map = populate_topic ( & topic_name, 100 , & value_fn, & key_fn, None , None ) . await ;
7575 let consumer = create_stream_consumer ( & rand_test_group ( ) , None ) ;
7676 consumer. subscribe ( & [ topic_name. as_str ( ) ] ) . unwrap ( ) ;
@@ -105,7 +105,7 @@ async fn test_produce_consume_base() {
105105async fn test_produce_consume_base_concurrent ( ) {
106106 let _r = env_logger:: try_init ( ) ;
107107
108- let topic_name = rand_test_topic ( ) ;
108+ let topic_name = rand_test_topic ( "test_produce_consume_base_concurrent" ) ;
109109 populate_topic ( & topic_name, 100 , & value_fn, & key_fn, None , None ) . await ;
110110
111111 let consumer = Arc :: new ( create_stream_consumer ( & rand_test_group ( ) , None ) ) ;
@@ -135,7 +135,7 @@ async fn test_produce_consume_base_concurrent() {
135135async fn test_produce_consume_base_assign ( ) {
136136 let _r = env_logger:: try_init ( ) ;
137137
138- let topic_name = rand_test_topic ( ) ;
138+ let topic_name = rand_test_topic ( "test_produce_consume_base_assign" ) ;
139139 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, Some ( 0 ) , None ) . await ;
140140 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, Some ( 1 ) , None ) . await ;
141141 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, Some ( 2 ) , None ) . await ;
@@ -170,7 +170,7 @@ async fn test_produce_consume_base_assign() {
170170async fn test_produce_consume_base_unassign ( ) {
171171 let _r = env_logger:: try_init ( ) ;
172172
173- let topic_name = rand_test_topic ( ) ;
173+ let topic_name = rand_test_topic ( "test_produce_consume_base_unassign" ) ;
174174 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, Some ( 0 ) , None ) . await ;
175175 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, Some ( 1 ) , None ) . await ;
176176 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, Some ( 2 ) , None ) . await ;
@@ -195,7 +195,7 @@ async fn test_produce_consume_base_unassign() {
195195async fn test_produce_consume_base_incremental_assign_and_unassign ( ) {
196196 let _r = env_logger:: try_init ( ) ;
197197
198- let topic_name = rand_test_topic ( ) ;
198+ let topic_name = rand_test_topic ( "test_produce_consume_base_incremental_assign_and_unassign" ) ;
199199 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, Some ( 0 ) , None ) . await ;
200200 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, Some ( 1 ) , None ) . await ;
201201 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, Some ( 2 ) , None ) . await ;
@@ -236,7 +236,7 @@ async fn test_produce_consume_base_incremental_assign_and_unassign() {
236236async fn test_produce_consume_with_timestamp ( ) {
237237 let _r = env_logger:: try_init ( ) ;
238238
239- let topic_name = rand_test_topic ( ) ;
239+ let topic_name = rand_test_topic ( "test_produce_consume_with_timestamp" ) ;
240240 let message_map =
241241 populate_topic ( & topic_name, 100 , & value_fn, & key_fn, Some ( 0 ) , Some ( 1111 ) ) . await ;
242242 let consumer = create_stream_consumer ( & rand_test_group ( ) , None ) ;
@@ -277,7 +277,7 @@ async fn test_produce_consume_with_timestamp() {
277277async fn test_consumer_commit_message ( ) {
278278 let _r = env_logger:: try_init ( ) ;
279279
280- let topic_name = rand_test_topic ( ) ;
280+ let topic_name = rand_test_topic ( "test_consumer_commit_message" ) ;
281281 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, Some ( 0 ) , None ) . await ;
282282 populate_topic ( & topic_name, 11 , & value_fn, & key_fn, Some ( 1 ) , None ) . await ;
283283 populate_topic ( & topic_name, 12 , & value_fn, & key_fn, Some ( 2 ) , None ) . await ;
@@ -355,7 +355,7 @@ async fn test_consumer_commit_message() {
355355async fn test_consumer_store_offset_commit ( ) {
356356 let _r = env_logger:: try_init ( ) ;
357357
358- let topic_name = rand_test_topic ( ) ;
358+ let topic_name = rand_test_topic ( "test_consumer_store_offset_commit" ) ;
359359 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, Some ( 0 ) , None ) . await ;
360360 populate_topic ( & topic_name, 11 , & value_fn, & key_fn, Some ( 1 ) , None ) . await ;
361361 populate_topic ( & topic_name, 12 , & value_fn, & key_fn, Some ( 2 ) , None ) . await ;
@@ -440,7 +440,7 @@ async fn test_consumer_store_offset_commit() {
440440async fn test_consumer_commit_metadata ( ) -> Result < ( ) , Box < dyn Error > > {
441441 let _ = env_logger:: try_init ( ) ;
442442
443- let topic_name = rand_test_topic ( ) ;
443+ let topic_name = rand_test_topic ( "test_consumer_commit_metadata" ) ;
444444 let group_name = rand_test_group ( ) ;
445445 populate_topic ( & topic_name, 10 , & value_fn, & key_fn, None , None ) . await ;
446446
@@ -491,11 +491,11 @@ async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
491491 Ok ( ( ) )
492492}
493493
494- #[ tokio:: test]
494+ #[ tokio:: test( flavor = "multi_thread" ) ]
495495async fn test_consume_partition_order ( ) {
496496 let _r = env_logger:: try_init ( ) ;
497497
498- let topic_name = rand_test_topic ( ) ;
498+ let topic_name = rand_test_topic ( "test_consume_partition_order" ) ;
499499 populate_topic ( & topic_name, 4 , & value_fn, & key_fn, Some ( 0 ) , None ) . await ;
500500 populate_topic ( & topic_name, 4 , & value_fn, & key_fn, Some ( 1 ) , None ) . await ;
501501 populate_topic ( & topic_name, 4 , & value_fn, & key_fn, Some ( 2 ) , None ) . await ;
@@ -545,8 +545,8 @@ async fn test_consume_partition_order() {
545545 let partition1 = consumer. split_partition_queue ( & topic_name, 1 ) . unwrap ( ) ;
546546
547547 let mut i = 0 ;
548- while i < 12 {
549- if let Some ( m) = consumer. recv ( ) . now_or_never ( ) {
548+ while i < 5 {
549+ if let Ok ( m) = time :: timeout ( Duration :: from_millis ( 1000 ) , consumer. recv ( ) ) . await {
550550 // retry on transient errors until we get a message
551551 let m = match m {
552552 Err ( KafkaError :: MessageConsumption (
@@ -564,9 +564,11 @@ async fn test_consume_partition_order() {
564564 let partition: i32 = m. partition ( ) ;
565565 assert ! ( partition == 0 || partition == 2 ) ;
566566 i += 1 ;
567+ } else {
568+ panic ! ( "Timeout receiving message" ) ;
567569 }
568570
569- if let Some ( m) = partition1. recv ( ) . now_or_never ( ) {
571+ if let Ok ( m) = time :: timeout ( Duration :: from_millis ( 1000 ) , partition1. recv ( ) ) . await {
570572 // retry on transient errors until we get a message
571573 let m = match m {
572574 Err ( KafkaError :: MessageConsumption (
@@ -583,6 +585,8 @@ async fn test_consume_partition_order() {
583585 } ;
584586 assert_eq ! ( m. partition( ) , 1 ) ;
585587 i += 1 ;
588+ } else {
589+ panic ! ( "Timeout receiving message" ) ;
586590 }
587591 }
588592 }
0 commit comments