Skip to content

Commit 4ef3b98

Browse files
committed
core, tests: Remove IndexingContext.filter
There's no real reason to hang on to the filter after the block stream has been constructed
1 parent c57fc21 commit 4ef3b98

File tree

3 files changed

+7
-13
lines changed

3 files changed

+7
-13
lines changed

core/src/subgraph/context/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::polling_monitor::{
66
use anyhow::{self, Error};
77
use bytes::Bytes;
88
use graph::{
9-
blockchain::{Blockchain, TriggerFilterWrapper},
9+
blockchain::Blockchain,
1010
components::{store::DeploymentId, subgraph::HostMetrics},
1111
data::subgraph::SubgraphManifest,
1212
data_source::{
@@ -74,7 +74,6 @@ where
7474
pub(crate) instance: SubgraphInstance<C, T>,
7575
pub instances: SubgraphKeepAlive,
7676
pub offchain_monitor: OffchainMonitor,
77-
pub filter: Option<TriggerFilterWrapper<C>>,
7877
pub(crate) trigger_processor: Box<dyn TriggerProcessor<C, T>>,
7978
pub(crate) decoder: Box<Decoder<C, T>>,
8079
}
@@ -101,7 +100,6 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
101100
instance,
102101
instances,
103102
offchain_monitor,
104-
filter: None,
105103
trigger_processor,
106104
decoder,
107105
}

core/src/subgraph/runner/mod.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -223,15 +223,11 @@ where
223223
let block_stream_canceler = CancelGuard::new();
224224
let block_stream_cancel_handle = block_stream_canceler.handle();
225225
// TriggerFilter needs to be rebuilt eveytime the blockstream is restarted
226-
self.ctx.filter = Some(self.build_filter());
226+
let filter = self.build_filter();
227227

228-
let block_stream = new_block_stream(
229-
&self.inputs,
230-
self.ctx.filter.clone().unwrap(), // Safe to unwrap as we just called `build_filter` in the previous line
231-
&self.metrics.subgraph,
232-
)
233-
.await?
234-
.cancelable(&block_stream_canceler);
228+
let block_stream = new_block_stream(&self.inputs, filter, &self.metrics.subgraph)
229+
.await?
230+
.cancelable(&block_stream_canceler);
235231

236232
self.cancel_handle = Some(block_stream_cancel_handle);
237233

@@ -262,7 +258,7 @@ where
262258
/// Returns the next state to transition to:
263259
/// - `Restarting` to start the block stream (normal case)
264260
/// - `Stopped` if the max end block was already reached
265-
async fn initialize(&mut self) -> Result<RunnerState<C>, SubgraphRunnerError> {
261+
async fn initialize(&self) -> Result<RunnerState<C>, SubgraphRunnerError> {
266262
self.update_deployment_synced_metric();
267263

268264
// If a subgraph failed for deterministic reasons, before start indexing, we first

tests/tests/runner_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ async fn end_block() -> anyhow::Result<()> {
448448
) {
449449
let runner = ctx.runner(block_ptr.clone()).await;
450450
let runner = runner.run_for_test(false).await.unwrap();
451-
let filter = runner.context().filter.as_ref().unwrap();
451+
let filter = runner.build_filter_for_test();
452452
let addresses = filter
453453
.chain_filter
454454
.log()

0 commit comments

Comments
 (0)