Skip to content

Commit 4fb2266

Browse files
committed
Do not panic on transient errors on test_consume_partition_order
1 parent 2af3671 commit 4fb2266

File tree

2 files changed

+61
-4
lines changed

2 files changed

+61
-4
lines changed

tests/test_high_consumers.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::sync::Arc;
77
use futures::future::{self, FutureExt};
88
use futures::stream::StreamExt;
99
use maplit::hashmap;
10+
use rdkafka_sys::RDKafkaErrorCode;
1011
use tokio::time::{self, Duration};
1112

1213
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, StreamConsumer};
@@ -546,13 +547,41 @@ async fn test_consume_partition_order() {
546547
let mut i = 0;
547548
while i < 12 {
548549
if let Some(m) = consumer.recv().now_or_never() {
549-
let partition = m.unwrap().partition();
550+
// retry on transient errors until we get a message
551+
let m = match m {
552+
Err(KafkaError::MessageConsumption(
553+
RDKafkaErrorCode::BrokerTransportFailure,
554+
))
555+
| Err(KafkaError::MessageConsumption(RDKafkaErrorCode::AllBrokersDown))
556+
| Err(KafkaError::MessageConsumption(RDKafkaErrorCode::OperationTimedOut)) => {
557+
continue
558+
}
559+
Err(err) => {
560+
panic!("Unexpected error receiving message: {:?}", err);
561+
}
562+
Ok(m) => m,
563+
};
564+
let partition: i32 = m.partition();
550565
assert!(partition == 0 || partition == 2);
551566
i += 1;
552567
}
553568

554569
if let Some(m) = partition1.recv().now_or_never() {
555-
assert_eq!(m.unwrap().partition(), 1);
570+
// retry on transient errors until we get a message
571+
let m = match m {
572+
Err(KafkaError::MessageConsumption(
573+
RDKafkaErrorCode::BrokerTransportFailure,
574+
))
575+
| Err(KafkaError::MessageConsumption(RDKafkaErrorCode::AllBrokersDown))
576+
| Err(KafkaError::MessageConsumption(RDKafkaErrorCode::OperationTimedOut)) => {
577+
continue
578+
}
579+
Err(err) => {
580+
panic!("Unexpected error receiving message: {:?}", err);
581+
}
582+
Ok(m) => m,
583+
};
584+
assert_eq!(m.partition(), 1);
556585
i += 1;
557586
}
558587
}

tests/test_low_consumers.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,13 +288,41 @@ async fn test_consume_partition_order() {
288288
let mut i = 0;
289289
while i < 12 {
290290
if let Some(m) = consumer.poll(Timeout::After(Duration::from_secs(0))) {
291-
let partition = m.unwrap().partition();
291+
// retry on transient errors until we get a message
292+
let m = match m {
293+
Err(KafkaError::MessageConsumption(
294+
RDKafkaErrorCode::BrokerTransportFailure,
295+
))
296+
| Err(KafkaError::MessageConsumption(RDKafkaErrorCode::AllBrokersDown))
297+
| Err(KafkaError::MessageConsumption(RDKafkaErrorCode::OperationTimedOut)) => {
298+
continue
299+
}
300+
Err(err) => {
301+
panic!("Unexpected error receiving message: {:?}", err);
302+
}
303+
Ok(m) => m,
304+
};
305+
let partition = m.partition();
292306
assert!(partition == 0 || partition == 2);
293307
i += 1;
294308
}
295309

296310
if let Some(m) = partition1.poll(Timeout::After(Duration::from_secs(0))) {
297-
assert_eq!(m.unwrap().partition(), 1);
311+
// retry on transient errors until we get a message
312+
let m = match m {
313+
Err(KafkaError::MessageConsumption(
314+
RDKafkaErrorCode::BrokerTransportFailure,
315+
))
316+
| Err(KafkaError::MessageConsumption(RDKafkaErrorCode::AllBrokersDown))
317+
| Err(KafkaError::MessageConsumption(RDKafkaErrorCode::OperationTimedOut)) => {
318+
continue
319+
}
320+
Err(err) => {
321+
panic!("Unexpected error receiving message: {:?}", err);
322+
}
323+
Ok(m) => m,
324+
};
325+
assert_eq!(m.partition(), 1);
298326
i += 1;
299327
}
300328
}

0 commit comments

Comments
 (0)