Skip to content

Commit 34fc335

Browse files
committed
Expose a close_queue and closed methods
If you have a consumer wrapping this one (FFI cases), the outer consumer must close the queue and serve the events via Poll. Otherwise it will hang forever as prior to calling close there's a rebalance & rdkafka awaits a response before continuing.
1 parent 4fb2266 commit 34fc335

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

src/consumer/base_consumer.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,27 @@ where
331331
PartitionQueue::new(self.clone(), queue)
332332
})
333333
}
334+
335+
/// Close the queue used by a consumer.
336+
/// Only exposed for advanced usage of this API and should not be used under normal circumstances.
337+
pub fn close_queue(&self) -> KafkaResult<()> {
338+
let err = unsafe {
339+
RDKafkaError::from_ptr(rdsys::rd_kafka_consumer_close_queue(
340+
self.client.native_ptr(),
341+
self.queue.ptr(),
342+
))
343+
};
344+
if err.is_error() {
345+
Err(KafkaError::ConsumerQueueClose(err.code()))
346+
} else {
347+
Ok(())
348+
}
349+
}
350+
351+
/// Returns true if the consumer is closed, else false.
352+
pub fn closed(&self) -> bool {
353+
unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) == 1 }
354+
}
334355
}
335356

336357
impl<C> Consumer<C> for BaseConsumer<C>

src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ pub enum KafkaError {
147147
ClientCreation(String),
148148
/// Consumer commit failed.
149149
ConsumerCommit(RDKafkaErrorCode),
150+
/// Consumer queue close failed.
151+
ConsumerQueueClose(RDKafkaErrorCode),
150152
/// Flushing failed
151153
Flush(RDKafkaErrorCode),
152154
/// Global error.
@@ -204,6 +206,9 @@ impl fmt::Debug for KafkaError {
204206
KafkaError::ConsumerCommit(err) => {
205207
write!(f, "KafkaError (Consumer commit error: {})", err)
206208
}
209+
KafkaError::ConsumerQueueClose(err) => {
210+
write!(f, "KafkaError (Consumer queue close error: {})", err)
211+
}
207212
KafkaError::Flush(err) => write!(f, "KafkaError (Flush error: {})", err),
208213
KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err),
209214
KafkaError::GroupListFetch(err) => {
@@ -255,6 +260,7 @@ impl fmt::Display for KafkaError {
255260
}
256261
KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err),
257262
KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err),
263+
KafkaError::ConsumerQueueClose(err) => write!(f, "Consumer queue close error: {}", err),
258264
KafkaError::Flush(err) => write!(f, "Flush error: {}", err),
259265
KafkaError::Global(err) => write!(f, "Global error: {}", err),
260266
KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
@@ -288,6 +294,7 @@ impl Error for KafkaError {
288294
KafkaError::ClientConfig(..) => None,
289295
KafkaError::ClientCreation(_) => None,
290296
KafkaError::ConsumerCommit(err) => Some(err),
297+
KafkaError::ConsumerQueueClose(err) => Some(err),
291298
KafkaError::Flush(err) => Some(err),
292299
KafkaError::Global(err) => Some(err),
293300
KafkaError::GroupListFetch(err) => Some(err),
@@ -327,6 +334,7 @@ impl KafkaError {
327334
KafkaError::ClientConfig(..) => None,
328335
KafkaError::ClientCreation(_) => None,
329336
KafkaError::ConsumerCommit(err) => Some(*err),
337+
KafkaError::ConsumerQueueClose(err) => Some(*err),
330338
KafkaError::Flush(err) => Some(*err),
331339
KafkaError::Global(err) => Some(*err),
332340
KafkaError::GroupListFetch(err) => Some(*err),

0 commit comments

Comments
 (0)