Skip to content

Commit c811175

Browse files
committed
Pass BaseConsumer to ConsumerContext::rebalance
1 parent 0c5c131 commit c811175

File tree

2 files changed

+14
-10
lines changed

2 files changed

+14
-10
lines changed

src/consumer/base_consumer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use log::{error, warn};
1111
use rdkafka_sys as rdsys;
1212
use rdkafka_sys::types::*;
1313

14-
use crate::client::{Client, NativeQueue};
14+
use crate::client::{Client, NativeClient, NativeQueue};
1515
use crate::config::{
1616
ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
1717
};
@@ -188,8 +188,7 @@ where
188188
// The TPL is owned by the Event and will be destroyed when the event is destroyed.
189189
// Dropping it here will lead to double free.
190190
let mut tpl = ManuallyDrop::new(tpl);
191-
self.context()
192-
.rebalance(self.client.native_client(), err, &mut tpl);
191+
self.context().rebalance(self, err, &mut tpl);
193192
}
194193
_ => {
195194
let buf = unsafe {
@@ -353,6 +352,10 @@ where
353352
pub fn closed(&self) -> bool {
354353
unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) == 1 }
355354
}
355+
356+
pub(crate) fn native_client(&self) -> &NativeClient {
357+
self.client.native_client()
358+
}
356359
}
357360

358361
impl<C> Consumer<C> for BaseConsumer<C>

src/consumer/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::time::Duration;
77
use rdkafka_sys as rdsys;
88
use rdkafka_sys::types::*;
99

10-
use crate::client::{Client, ClientContext, NativeClient};
10+
use crate::client::{Client, ClientContext};
1111
use crate::error::{KafkaError, KafkaResult};
1212
use crate::groups::GroupList;
1313
use crate::log::{error, trace};
@@ -43,15 +43,15 @@ pub enum Rebalance<'a> {
4343
/// be specified.
4444
///
4545
/// See also the [`ClientContext`] trait.
46-
pub trait ConsumerContext: ClientContext {
46+
pub trait ConsumerContext: ClientContext + Sized {
4747
/// Implements the default rebalancing strategy and calls the
4848
/// [`pre_rebalance`](ConsumerContext::pre_rebalance) and
4949
/// [`post_rebalance`](ConsumerContext::post_rebalance) methods. If this
5050
/// method is overridden, it will be responsibility of the user to call them
5151
/// if needed.
5252
fn rebalance(
5353
&self,
54-
native_client: &NativeClient,
54+
base_consumer: &BaseConsumer<Self>,
5555
err: RDKafkaRespErr,
5656
tpl: &mut TopicPartitionList,
5757
) {
@@ -66,9 +66,10 @@ pub trait ConsumerContext: ClientContext {
6666
};
6767

6868
trace!("Running pre-rebalance with {:?}", rebalance);
69-
self.pre_rebalance(&rebalance);
69+
self.pre_rebalance(base_consumer, &rebalance);
7070

7171
trace!("Running rebalance with {:?}", rebalance);
72+
let native_client = base_consumer.native_client();
7273
// Execute rebalance
7374
unsafe {
7475
match err {
@@ -93,18 +94,18 @@ pub trait ConsumerContext: ClientContext {
9394
}
9495
}
9596
trace!("Running post-rebalance with {:?}", rebalance);
96-
self.post_rebalance(&rebalance);
97+
self.post_rebalance(base_consumer, &rebalance);
9798
}
9899

99100
/// Pre-rebalance callback. This method will run before the rebalance and
100101
/// should terminate its execution quickly.
101102
#[allow(unused_variables)]
102-
fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
103+
fn pre_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
103104

104105
/// Post-rebalance callback. This method will run after the rebalance and
105106
/// should terminate its execution quickly.
106107
#[allow(unused_variables)]
107-
fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
108+
fn post_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
108109

109110
// TODO: convert pointer to structure
110111
/// Post commit callback. This method will run after a group of offsets was

0 commit comments

Comments
 (0)