Skip to content

Commit 33d72d6

Browse files
authored
chore(rpc): subscription kind (#277)
1 parent 08bd601 commit 33d72d6

File tree

3 files changed

+45
-26
lines changed

3 files changed

+45
-26
lines changed

crates/rpc/src/base/pubsub.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use serde::Serialize;
2424
use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream};
2525
use tracing::error;
2626

27-
use crate::ExtendedSubscriptionKind;
27+
use crate::{BaseSubscriptionKind, ExtendedSubscriptionKind};
2828

2929
/// Eth pub-sub RPC extension for flashblocks and standard subscriptions.
3030
///
@@ -138,17 +138,21 @@ where
138138
}
139139

140140
// Handle flashblocks-specific subscriptions
141+
let ExtendedSubscriptionKind::Base(base_kind) = kind else {
142+
unreachable!("Standard subscription types should be delegated to inner");
143+
};
144+
141145
let sink = pending.accept().await?;
142146

143-
match kind {
144-
ExtendedSubscriptionKind::NewFlashblocks => {
147+
match base_kind {
148+
BaseSubscriptionKind::NewFlashblocks => {
145149
let stream = Self::new_flashblocks_stream(Arc::clone(&self.flashblocks_state));
146150

147151
tokio::spawn(async move {
148152
pipe_from_stream(sink, stream).await;
149153
});
150154
}
151-
ExtendedSubscriptionKind::PendingLogs => {
155+
BaseSubscriptionKind::PendingLogs => {
152156
// Extract filter from params, default to empty filter (match all)
153157
let filter = match params {
154158
Some(Params::Logs(filter)) => *filter,
@@ -161,8 +165,6 @@ where
161165
pipe_from_stream(sink, stream).await;
162166
});
163167
}
164-
// Standard types are handled above, this branch is unreachable
165-
_ => unreachable!("Standard subscription types should be delegated to inner"),
166168
}
167169

168170
Ok(())

crates/rpc/src/base/types.rs

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,36 @@ pub struct TransactionStatusResponse {
2222
/// Extended subscription kind that includes both standard Ethereum subscription types
2323
/// and flashblocks-specific types.
2424
///
25-
/// This enum wraps the standard `SubscriptionKind` from alloy and adds flashblocks support,
26-
/// allowing `eth_subscribe` to handle both standard subscriptions (newHeads, logs, etc.)
25+
/// This enum encapsulates the standard [`SubscriptionKind`] from alloy and adds flashblocks
26+
/// support, allowing `eth_subscribe` to handle both standard subscriptions (newHeads, logs, etc.)
2727
/// and custom flashblocks subscriptions.
28+
///
29+
/// By encapsulating [`SubscriptionKind`] rather than redefining its variants, we automatically
30+
/// inherit support for any new variants added upstream, or get a compile error if the signature
31+
/// changes.
2832
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
29-
#[serde(rename_all = "camelCase")]
33+
#[serde(untagged)]
3034
pub enum ExtendedSubscriptionKind {
31-
/// New block headers subscription (standard).
32-
NewHeads,
33-
/// Logs subscription (standard).
34-
Logs,
35-
/// New pending transactions subscription (standard).
36-
NewPendingTransactions,
37-
/// Node syncing status subscription (standard).
38-
Syncing,
39-
/// New flashblocks subscription (Base-specific).
35+
/// Standard Ethereum subscription types (newHeads, logs, newPendingTransactions, syncing).
36+
///
37+
/// These are proxied to reth's underlying `EthPubSub` implementation.
38+
Standard(SubscriptionKind),
39+
/// Base-specific subscription types for flashblocks.
40+
Base(BaseSubscriptionKind),
41+
}
42+
43+
/// Base-specific subscription types for flashblocks.
44+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
45+
#[serde(rename_all = "camelCase")]
46+
pub enum BaseSubscriptionKind {
47+
/// New flashblocks subscription.
4048
///
4149
/// Fires a notification each time a new flashblock is processed, providing the current
4250
/// pending block state. Each flashblock represents an incremental update to the pending
4351
/// block, so multiple notifications may be emitted for the same block height as new
4452
/// flashblocks arrive.
4553
NewFlashblocks,
46-
/// Pending logs subscription (Base-specific).
54+
/// Pending logs subscription.
4755
///
4856
/// Returns logs from flashblocks pending state that match the given filter criteria.
4957
/// Unlike standard `logs` subscription which only includes logs from confirmed blocks,
@@ -55,16 +63,25 @@ impl ExtendedSubscriptionKind {
5563
/// Returns the standard subscription kind if this is a standard subscription type.
5664
pub const fn as_standard(&self) -> Option<SubscriptionKind> {
5765
match self {
58-
Self::NewHeads => Some(SubscriptionKind::NewHeads),
59-
Self::Logs => Some(SubscriptionKind::Logs),
60-
Self::NewPendingTransactions => Some(SubscriptionKind::NewPendingTransactions),
61-
Self::Syncing => Some(SubscriptionKind::Syncing),
62-
Self::NewFlashblocks | Self::PendingLogs => None,
66+
Self::Standard(kind) => Some(*kind),
67+
Self::Base(_) => None,
6368
}
6469
}
6570

6671
/// Returns true if this is a flashblocks-specific subscription.
6772
pub const fn is_flashblocks(&self) -> bool {
68-
matches!(self, Self::NewFlashblocks | Self::PendingLogs)
73+
matches!(self, Self::Base(_))
74+
}
75+
}
76+
77+
impl From<SubscriptionKind> for ExtendedSubscriptionKind {
78+
fn from(kind: SubscriptionKind) -> Self {
79+
Self::Standard(kind)
80+
}
81+
}
82+
83+
impl From<BaseSubscriptionKind> for ExtendedSubscriptionKind {
84+
fn from(kind: BaseSubscriptionKind) -> Self {
85+
Self::Base(kind)
6986
}
7087
}

crates/rpc/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub use base::{
1313
pubsub::{EthPubSub, EthPubSubApiServer},
1414
traits::{MeteringApiServer, TransactionStatusApiServer},
1515
transaction_rpc::TransactionStatusApiImpl,
16-
types::{ExtendedSubscriptionKind, Status, TransactionStatusResponse},
16+
types::{BaseSubscriptionKind, ExtendedSubscriptionKind, Status, TransactionStatusResponse},
1717
};
1818

1919
mod eth;

0 commit comments

Comments
 (0)