Skip to content

Commit 3b13940

Browse files
committed
Propagate fatal errors
With the Event API we propagate generic client instance-level errors, such as broker connection failures, authentication issues, etc. However, fatal errors are also propagated via the Event API. These indicates that the particular instance of the client (producer/consumer) becomes non-functional.
1 parent 7202e7b commit 3b13940

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

src/consumer/base_consumer.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -225,16 +225,16 @@ where
225225
fn handle_error_event(&self, event: NativePtr<RDKafkaEvent>) -> Option<KafkaError> {
226226
let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
227227
if rdkafka_err.is_error() {
228-
let err = match rdkafka_err {
229-
rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => {
230-
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
231-
let partition = unsafe { (*tp_ptr).partition };
232-
unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
233-
KafkaError::PartitionEOF(partition)
234-
}
235-
e => KafkaError::MessageConsumption(e.into()),
236-
};
237-
Some(err)
228+
if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF {
229+
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
230+
let partition = unsafe { (*tp_ptr).partition };
231+
unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
232+
Some(KafkaError::PartitionEOF(partition))
233+
} else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 {
234+
Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into()))
235+
} else {
236+
Some(KafkaError::MessageConsumption(rdkafka_err.into()))
237+
}
238238
} else {
239239
None
240240
}

src/error.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ pub enum KafkaError {
157157
GroupListFetch(RDKafkaErrorCode),
158158
/// Message consumption failed.
159159
MessageConsumption(RDKafkaErrorCode),
160+
/// Message consumption failed with fatal error.
161+
MessageConsumptionFatal(RDKafkaErrorCode),
160162
/// Message production error.
161163
MessageProduction(RDKafkaErrorCode),
162164
/// Metadata fetch error.
@@ -217,6 +219,9 @@ impl fmt::Debug for KafkaError {
217219
KafkaError::MessageConsumption(err) => {
218220
write!(f, "KafkaError (Message consumption error: {})", err)
219221
}
222+
KafkaError::MessageConsumptionFatal(err) => {
223+
write!(f, "(Fatal) KafkaError (Message consumption error: {})", err)
224+
}
220225
KafkaError::MessageProduction(err) => {
221226
write!(f, "KafkaError (Message production error: {})", err)
222227
}
@@ -265,6 +270,9 @@ impl fmt::Display for KafkaError {
265270
KafkaError::Global(err) => write!(f, "Global error: {}", err),
266271
KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
267272
KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err),
273+
KafkaError::MessageConsumptionFatal(err) => {
274+
write!(f, "(Fatal) Message consumption error: {}", err)
275+
}
268276
KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err),
269277
KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err),
270278
KafkaError::NoMessageReceived => {
@@ -299,6 +307,7 @@ impl Error for KafkaError {
299307
KafkaError::Global(err) => Some(err),
300308
KafkaError::GroupListFetch(err) => Some(err),
301309
KafkaError::MessageConsumption(err) => Some(err),
310+
KafkaError::MessageConsumptionFatal(err) => Some(err),
302311
KafkaError::MessageProduction(err) => Some(err),
303312
KafkaError::MetadataFetch(err) => Some(err),
304313
KafkaError::NoMessageReceived => None,
@@ -339,6 +348,7 @@ impl KafkaError {
339348
KafkaError::Global(err) => Some(*err),
340349
KafkaError::GroupListFetch(err) => Some(*err),
341350
KafkaError::MessageConsumption(err) => Some(*err),
351+
KafkaError::MessageConsumptionFatal(err) => Some(*err),
342352
KafkaError::MessageProduction(err) => Some(*err),
343353
KafkaError::MetadataFetch(err) => Some(*err),
344354
KafkaError::NoMessageReceived => None,

0 commit comments

Comments
 (0)