Skip to content

Commit 2af3671

Browse files
committed
Allow creating a consumer without group.id
Currently if a group.id is not specified we allow the use of the consumer for fetching metadata and watermarks. Keeping this behaviour.
1 parent 3b98f95 commit 2af3671

File tree

1 file changed

+31
-16
lines changed

1 file changed

+31
-16
lines changed

src/consumer/base_consumer.rs

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ where
3737
{
3838
client: Client<C>,
3939
queue: NativeQueue,
40+
group_id: Option<String>,
4041
}
4142

4243
impl FromClientConfig for BaseConsumer {
@@ -78,14 +79,26 @@ where
7879
context,
7980
)?;
8081

81-
// Redirect rdkafka's main queue to the consumer queue so that we only
82-
// need to listen to the consumer queue to observe events like
83-
// rebalancings and stats.
84-
unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) };
85-
let queue = client.consumer_queue().ok_or_else(|| {
86-
KafkaError::ClientCreation("rdkafka consumer queue not available".to_string())
87-
})?;
88-
Ok(BaseConsumer { client, queue })
82+
let group_id = config.get("group.id").map(|s| s.to_string());
83+
// If a group.id is not specified, we won't redirect the main queue to the consumer queue,
84+
// allowing continued use of the consumer for fetching metadata and watermarks without the
85+
// need to specify a group.id
86+
let queue = if group_id.is_some() {
87+
// Redirect rdkafka's main queue to the consumer queue so that we only need to listen
88+
// to the consumer queue to observe events like rebalancings and stats.
89+
unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) };
90+
client.consumer_queue().ok_or_else(|| {
91+
KafkaError::ClientCreation("rdkafka consumer queue not available".to_string())
92+
})?
93+
} else {
94+
client.main_queue()
95+
};
96+
97+
Ok(BaseConsumer {
98+
client,
99+
queue,
100+
group_id,
101+
})
89102
}
90103

91104
/// Polls the consumer for new messages.
@@ -681,15 +694,17 @@ where
681694
{
682695
fn drop(&mut self) {
683696
trace!("Destroying consumer: {:?}", self.client.native_ptr());
684-
let err = unsafe {
685-
rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr())
686-
};
687-
if !err.is_null() {
688-
error!("Failed to close the consumer queue on drop");
689-
}
697+
if self.group_id.is_some() {
698+
let err = unsafe {
699+
rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr())
700+
};
701+
if !err.is_null() {
702+
error!("Failed to close the consumer queue on drop");
703+
}
690704

691-
while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 {
692-
self.poll(Duration::from_millis(100));
705+
while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 {
706+
self.poll(Duration::from_millis(100));
707+
}
693708
}
694709
trace!("Consumer destroyed: {:?}", self.client.native_ptr());
695710
}

0 commit comments

Comments
 (0)