Skip to content

Commit 85539ac

Browse files
authored
Merge pull request #636 from getsentry/base-consumer
Pass BaseConsumer to ConsumerContext::rebalance
2 parents 6154c84 + 95f6cd8 commit 85539ac

File tree

3 files changed

+17
-13
lines changed

3 files changed

+17
-13
lines changed

examples/simple_consumer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use log::{info, warn};
44
use rdkafka::client::ClientContext;
55
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
66
use rdkafka::consumer::stream_consumer::StreamConsumer;
7-
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
7+
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance};
88
use rdkafka::error::KafkaResult;
99
use rdkafka::message::{Headers, Message};
1010
use rdkafka::topic_partition_list::TopicPartitionList;
@@ -22,11 +22,11 @@ struct CustomContext;
2222
impl ClientContext for CustomContext {}
2323

2424
impl ConsumerContext for CustomContext {
25-
fn pre_rebalance(&self, rebalance: &Rebalance) {
25+
fn pre_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
2626
info!("Pre rebalance {:?}", rebalance);
2727
}
2828

29-
fn post_rebalance(&self, rebalance: &Rebalance) {
29+
fn post_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
3030
info!("Post rebalance {:?}", rebalance);
3131
}
3232

src/consumer/base_consumer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use log::{error, warn};
1111
use rdkafka_sys as rdsys;
1212
use rdkafka_sys::types::*;
1313

14-
use crate::client::{Client, NativeQueue};
14+
use crate::client::{Client, NativeClient, NativeQueue};
1515
use crate::config::{
1616
ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
1717
};
@@ -194,8 +194,7 @@ where
194194
// The TPL is owned by the Event and will be destroyed when the event is destroyed.
195195
// Dropping it here will lead to double free.
196196
let mut tpl = ManuallyDrop::new(tpl);
197-
self.context()
198-
.rebalance(self.client.native_client(), err, &mut tpl);
197+
self.context().rebalance(self, err, &mut tpl);
199198
}
200199
_ => {
201200
let buf = unsafe {
@@ -359,6 +358,10 @@ where
359358
pub fn closed(&self) -> bool {
360359
unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) == 1 }
361360
}
361+
362+
pub(crate) fn native_client(&self) -> &NativeClient {
363+
self.client.native_client()
364+
}
362365
}
363366

364367
impl<C> Consumer<C> for BaseConsumer<C>

src/consumer/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::time::Duration;
77
use rdkafka_sys as rdsys;
88
use rdkafka_sys::types::*;
99

10-
use crate::client::{Client, ClientContext, NativeClient};
10+
use crate::client::{Client, ClientContext};
1111
use crate::error::{KafkaError, KafkaResult};
1212
use crate::groups::GroupList;
1313
use crate::log::{error, trace};
@@ -43,15 +43,15 @@ pub enum Rebalance<'a> {
4343
/// be specified.
4444
///
4545
/// See also the [`ClientContext`] trait.
46-
pub trait ConsumerContext: ClientContext {
46+
pub trait ConsumerContext: ClientContext + Sized {
4747
/// Implements the default rebalancing strategy and calls the
4848
/// [`pre_rebalance`](ConsumerContext::pre_rebalance) and
4949
/// [`post_rebalance`](ConsumerContext::post_rebalance) methods. If this
5050
/// method is overridden, it will be responsibility of the user to call them
5151
/// if needed.
5252
fn rebalance(
5353
&self,
54-
native_client: &NativeClient,
54+
base_consumer: &BaseConsumer<Self>,
5555
err: RDKafkaRespErr,
5656
tpl: &mut TopicPartitionList,
5757
) {
@@ -66,9 +66,10 @@ pub trait ConsumerContext: ClientContext {
6666
};
6767

6868
trace!("Running pre-rebalance with {:?}", rebalance);
69-
self.pre_rebalance(&rebalance);
69+
self.pre_rebalance(base_consumer, &rebalance);
7070

7171
trace!("Running rebalance with {:?}", rebalance);
72+
let native_client = base_consumer.native_client();
7273
// Execute rebalance
7374
unsafe {
7475
match err {
@@ -93,18 +94,18 @@ pub trait ConsumerContext: ClientContext {
9394
}
9495
}
9596
trace!("Running post-rebalance with {:?}", rebalance);
96-
self.post_rebalance(&rebalance);
97+
self.post_rebalance(base_consumer, &rebalance);
9798
}
9899

99100
/// Pre-rebalance callback. This method will run before the rebalance and
100101
/// should terminate its execution quickly.
101102
#[allow(unused_variables)]
102-
fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
103+
fn pre_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
103104

104105
/// Post-rebalance callback. This method will run after the rebalance and
105106
/// should terminate its execution quickly.
106107
#[allow(unused_variables)]
107-
fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
108+
fn post_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
108109

109110
// TODO: convert pointer to structure
110111
/// Post commit callback. This method will run after a group of offsets was

0 commit comments

Comments
 (0)