Skip to content

Commit 7202e7b

Browse files
committed
Use closed and close_queue methods on drop
1 parent 34fc335 commit 7202e7b

File tree

1 file changed

+6
-9
lines changed

1 file changed

+6
-9
lines changed

src/consumer/base_consumer.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -716,15 +716,12 @@ where
716716
fn drop(&mut self) {
717717
trace!("Destroying consumer: {:?}", self.client.native_ptr());
718718
if self.group_id.is_some() {
719-
let err = unsafe {
720-
rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr())
721-
};
722-
if !err.is_null() {
723-
error!("Failed to close the consumer queue on drop");
724-
}
725-
726-
while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 {
727-
self.poll(Duration::from_millis(100));
719+
if let Err(err) = self.close_queue() {
720+
error!("Failed to close consumer queue on drop: {}", err);
721+
} else {
722+
while !self.closed() {
723+
self.poll(Duration::from_millis(100));
724+
}
728725
}
729726
}
730727
trace!("Consumer destroyed: {:?}", self.client.native_ptr());

0 commit comments

Comments
 (0)