Skip to content

Commit c381fa9

Browse files
committed
Wire Kafka consumer and ResourceAnnotator into runner
Completes the metering pipeline by: - Creating MeteringRuntime to hold cache, estimator, and channels - Spawning ResourceAnnotator task to correlate transactions with flashblocks - Spawning KafkaBundleConsumer task to consume AcceptedBundle events - Adding CompositeFlashblocksReceiver that forwards to both FlashblocksState and the metering pipeline via FlashblockInclusion events - Adding flashblock_inclusion_from_flashblock helper to extract tx hashes - Supporting Kafka properties file loading The metering cache is now populated when: 1. Kafka consumer receives AcceptedBundle events (transaction data) 2. FlashblocksSubscriber receives flashblocks (inclusion position) 3. ResourceAnnotator correlates both to update the cache
1 parent 856ebbc commit c381fa9

File tree

3 files changed

+206
-35
lines changed

3 files changed

+206
-35
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/runner/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ reth-optimism-chainspec.workspace = true
2727
# alloy
2828
alloy-primitives.workspace = true
2929

30+
# flashblocks
31+
base-flashtypes.workspace = true
32+
3033
# misc
3134
eyre.workspace = true
3235
futures-util.workspace = true
@@ -35,3 +38,5 @@ tracing.workspace = true
3538
url.workspace = true
3639
parking_lot.workspace = true
3740
derive_more = { workspace = true, features = ["debug"] }
41+
rdkafka.workspace = true
42+
tokio.workspace = true

crates/runner/src/extensions/rpc.rs

Lines changed: 198 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,115 @@
22
33
use std::sync::Arc;
44

5-
use alloy_primitives::U256;
6-
use base_reth_flashblocks::{FlashblocksState, FlashblocksSubscriber};
5+
use alloy_primitives::{B256, U256, keccak256};
6+
use base_flashtypes::Flashblock;
7+
use base_reth_flashblocks::{FlashblocksReceiver, FlashblocksState, FlashblocksSubscriber};
78
use base_reth_rpc::{
8-
EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, MeteringApiImpl,
9-
MeteringApiServer, MeteringCache, PriorityFeeEstimator, ResourceLimits,
9+
EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, FlashblockInclusion,
10+
KafkaBundleConsumer, KafkaBundleConsumerConfig, MeteredTransaction, MeteringApiImpl,
11+
MeteringApiServer, MeteringCache, PriorityFeeEstimator, ResourceAnnotator, ResourceLimits,
1012
TransactionStatusApiImpl, TransactionStatusApiServer,
1113
};
1214
use parking_lot::RwLock;
13-
use tracing::info;
15+
use rdkafka::ClientConfig;
16+
use tokio::sync::mpsc;
17+
use tracing::{error, info, warn};
1418
use url::Url;
1519

1620
use crate::{
1721
BaseNodeConfig, FlashblocksConfig, MeteringConfig,
1822
extensions::{BaseNodeExtension, ConfigurableBaseNodeExtension, FlashblocksCell, OpBuilder},
1923
};
2024

25+
/// Runtime state for the metering pipeline.
26+
#[derive(Clone)]
27+
struct MeteringRuntime {
28+
/// Shared cache for metered transactions.
29+
cache: Arc<RwLock<MeteringCache>>,
30+
/// Priority fee estimator.
31+
estimator: Arc<PriorityFeeEstimator>,
32+
/// Sender for metered transactions from Kafka.
33+
tx_sender: mpsc::UnboundedSender<MeteredTransaction>,
34+
/// Sender for flashblock inclusions.
35+
flashblock_sender: mpsc::UnboundedSender<FlashblockInclusion>,
36+
}
37+
38+
/// Composite receiver that forwards flashblocks to both FlashblocksState and the metering pipeline.
39+
struct CompositeFlashblocksReceiver<Client> {
40+
state: Arc<FlashblocksState<Client>>,
41+
/// Optional channel for the metering pipeline; flashblocks RPC still needs the stream even
42+
/// when metering is disabled, so we only forward inclusions if a sender is provided.
43+
metering_sender: Option<mpsc::UnboundedSender<FlashblockInclusion>>,
44+
}
45+
46+
impl<Client> CompositeFlashblocksReceiver<Client> {
47+
fn new(
48+
state: Arc<FlashblocksState<Client>>,
49+
metering_sender: Option<mpsc::UnboundedSender<FlashblockInclusion>>,
50+
) -> Self {
51+
Self { state, metering_sender }
52+
}
53+
}
54+
55+
impl<Client> FlashblocksReceiver for CompositeFlashblocksReceiver<Client>
56+
where
57+
FlashblocksState<Client>: FlashblocksReceiver,
58+
{
59+
fn on_flashblock_received(&self, flashblock: Flashblock) {
60+
// Forward to the state first
61+
self.state.on_flashblock_received(flashblock.clone());
62+
63+
// Then forward to metering if enabled
64+
let Some(sender) = &self.metering_sender else {
65+
return;
66+
};
67+
let Some(inclusion) = flashblock_inclusion_from_flashblock(&flashblock) else {
68+
return;
69+
};
70+
71+
if sender.send(inclusion).is_err() {
72+
warn!(
73+
target: "metering::flashblocks",
74+
"Failed to forward flashblock inclusion to metering"
75+
);
76+
}
77+
}
78+
}
79+
80+
/// Converts a flashblock to a FlashblockInclusion for the metering pipeline.
81+
fn flashblock_inclusion_from_flashblock(flashblock: &Flashblock) -> Option<FlashblockInclusion> {
82+
if flashblock.diff.transactions.is_empty() {
83+
return None;
84+
}
85+
86+
let ordered_tx_hashes: Vec<B256> =
87+
flashblock.diff.transactions.iter().map(|tx_bytes| keccak256(tx_bytes)).collect();
88+
89+
Some(FlashblockInclusion {
90+
block_number: flashblock.metadata.block_number,
91+
flashblock_index: flashblock.index,
92+
ordered_tx_hashes,
93+
})
94+
}
95+
96+
/// Loads Kafka configuration from a properties file.
97+
fn load_kafka_config_from_file(
98+
path: &str,
99+
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error + Send + Sync>> {
100+
let content = std::fs::read_to_string(path)?;
101+
let mut props = Vec::new();
102+
for line in content.lines() {
103+
let line = line.trim();
104+
if line.is_empty() || line.starts_with('#') {
105+
continue;
106+
}
107+
if let Some((key, value)) = line.split_once('=') {
108+
props.push((key.trim().to_string(), value.trim().to_string()));
109+
}
110+
}
111+
Ok(props)
112+
}
113+
21114
/// Helper struct that wires the custom RPC modules into the node builder.
22115
#[derive(Debug, Clone)]
23116
pub struct BaseRpcExtension {
@@ -52,38 +145,102 @@ impl BaseNodeExtension for BaseRpcExtension {
52145
let sequencer_rpc = self.sequencer_rpc.clone();
53146

54147
builder.extend_rpc_modules(move |ctx| {
55-
if metering.enabled {
56-
info!(message = "Starting Metering RPC");
57-
58-
// Create priority fee estimator if configured
59-
let estimator = if metering.kafka.is_some() {
60-
info!(message = "Enabling priority fee estimation");
61-
let cache = Arc::new(RwLock::new(MeteringCache::new(metering.cache_size)));
62-
let limits = ResourceLimits {
63-
gas_used: Some(metering.resource_limits.gas_limit),
64-
execution_time_us: Some(metering.resource_limits.execution_time_us as u128),
65-
state_root_time_us: metering
66-
.resource_limits
67-
.state_root_time_us
68-
.map(|v| v as u128),
69-
data_availability_bytes: Some(metering.resource_limits.da_bytes),
70-
};
71-
let default_fee = U256::from(metering.uncongested_priority_fee);
72-
let estimator = Arc::new(PriorityFeeEstimator::new(
73-
cache,
74-
metering.priority_fee_percentile,
75-
limits,
76-
default_fee,
77-
None, // Dynamic DA config not wired yet
78-
));
79-
Some(estimator)
80-
} else {
81-
None
148+
// Set up metering runtime if enabled with Kafka
149+
let metering_runtime = if metering.enabled && metering.kafka.is_some() {
150+
info!(message = "Starting Metering RPC with priority fee estimation");
151+
152+
let cache = Arc::new(RwLock::new(MeteringCache::new(metering.cache_size)));
153+
let limits = ResourceLimits {
154+
gas_used: Some(metering.resource_limits.gas_limit),
155+
execution_time_us: Some(metering.resource_limits.execution_time_us as u128),
156+
state_root_time_us: metering
157+
.resource_limits
158+
.state_root_time_us
159+
.map(|v| v as u128),
160+
data_availability_bytes: Some(metering.resource_limits.da_bytes),
82161
};
162+
let default_fee = U256::from(metering.uncongested_priority_fee);
163+
let estimator = Arc::new(PriorityFeeEstimator::new(
164+
cache.clone(),
165+
metering.priority_fee_percentile,
166+
limits,
167+
default_fee,
168+
None, // Dynamic DA config not wired yet
169+
));
170+
171+
// Create channels for the annotator
172+
let (tx_sender, tx_receiver) = mpsc::unbounded_channel::<MeteredTransaction>();
173+
let (flashblock_sender, flashblock_receiver) =
174+
mpsc::unbounded_channel::<FlashblockInclusion>();
175+
176+
// Spawn the resource annotator
177+
let annotator_cache = cache.clone();
178+
tokio::spawn(async move {
179+
ResourceAnnotator::new(annotator_cache, tx_receiver, flashblock_receiver)
180+
.run()
181+
.await;
182+
});
183+
184+
Some(MeteringRuntime { cache, estimator, tx_sender, flashblock_sender })
185+
} else {
186+
None
187+
};
83188

84-
let metering_api = estimator.map_or_else(
189+
// Spawn Kafka consumer if configured
190+
if let (Some(runtime), Some(kafka_cfg)) = (&metering_runtime, &metering.kafka) {
191+
info!(message = "Starting Kafka consumer for metering");
192+
193+
let mut client_config = ClientConfig::new();
194+
client_config.set("bootstrap.servers", &kafka_cfg.brokers);
195+
client_config.set("group.id", &kafka_cfg.group_id);
196+
client_config.set("enable.partition.eof", "false");
197+
client_config.set("session.timeout.ms", "6000");
198+
client_config.set("enable.auto.commit", "true");
199+
client_config.set("auto.offset.reset", "earliest");
200+
201+
if let Some(path) = kafka_cfg.properties_file.as_ref() {
202+
match load_kafka_config_from_file(path) {
203+
Ok(props) => {
204+
for (key, value) in props {
205+
client_config.set(key, value);
206+
}
207+
}
208+
Err(err) => {
209+
warn!(
210+
message = "Failed to load Kafka properties file",
211+
file = %path,
212+
%err
213+
);
214+
}
215+
}
216+
}
217+
218+
let tx_sender = runtime.tx_sender.clone();
219+
let topic = kafka_cfg.topic.clone();
220+
tokio::spawn(async move {
221+
let config = KafkaBundleConsumerConfig { client_config, topic };
222+
223+
match KafkaBundleConsumer::new(config, tx_sender) {
224+
Ok(consumer) => consumer.run().await,
225+
Err(err) => error!(
226+
target: "metering::kafka",
227+
%err,
228+
"Failed to initialize Kafka consumer"
229+
),
230+
}
231+
});
232+
}
233+
234+
// Register metering RPC
235+
if metering.enabled {
236+
let metering_api = metering_runtime.as_ref().map_or_else(
85237
|| MeteringApiImpl::new(ctx.provider().clone()),
86-
|est| MeteringApiImpl::with_estimator(ctx.provider().clone(), est),
238+
|rt| {
239+
MeteringApiImpl::with_estimator(
240+
ctx.provider().clone(),
241+
rt.estimator.clone(),
242+
)
243+
},
87244
);
88245
ctx.modules.merge_configured(metering_api.into_rpc())?;
89246
}
@@ -107,7 +264,13 @@ impl BaseNodeExtension for BaseRpcExtension {
107264
.clone();
108265
fb.start();
109266

110-
let mut flashblocks_client = FlashblocksSubscriber::new(fb.clone(), ws_url);
267+
// Create composite receiver that forwards to both flashblocks state and metering
268+
let metering_sender =
269+
metering_runtime.as_ref().map(|rt| rt.flashblock_sender.clone());
270+
let receiver =
271+
Arc::new(CompositeFlashblocksReceiver::new(fb.clone(), metering_sender));
272+
273+
let mut flashblocks_client = FlashblocksSubscriber::new(receiver, ws_url);
111274
flashblocks_client.start();
112275

113276
let api_ext = EthApiExt::new(

0 commit comments

Comments
 (0)