From b60148dce2a6e68f4da097543a0c242b6aae9c9a Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Mon, 16 Feb 2026 12:54:02 -0800 Subject: [PATCH 1/2] all: Add AMP chain name aliases to config AMP-powered subgraphs use different network names than graph-node's internal chain names (e.g., AMP uses "ethereum-mainnet" while graph-node uses "mainnet"). Add a config-level `amp` field on chains that maps AMP names to internal names, so AMP manifests work without renaming chains in the database. --- core/src/subgraph/registrar.rs | 12 +- docs/config.md | 4 + graph/src/components/network_provider/mod.rs | 50 +++++ node/resources/tests/full_config.toml | 1 + node/src/config.rs | 192 ++++++++++++++++++- node/src/launcher.rs | 5 + node/src/manager/commands/run.rs | 2 + tests/src/fixture/mod.rs | 1 + 8 files changed, 263 insertions(+), 4 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index a60f2b01b03..9b0efe21abc 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -5,6 +5,7 @@ use graph::amp; use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; use graph::components::{ link_resolver::LinkResolverContext, + network_provider::AmpChainNames, store::{DeploymentId, DeploymentLocator, SubscriptionManager}, subgraph::Settings, }; @@ -30,6 +31,7 @@ pub struct SubgraphRegistrar { version_switching_mode: SubgraphVersionSwitchingMode, assignment_event_stream_cancel_guard: CancelGuard, // cancels on drop settings: Arc, + amp_chain_names: Arc, } impl SubgraphRegistrar @@ -50,6 +52,7 @@ where node_id: NodeId, version_switching_mode: SubgraphVersionSwitchingMode, settings: Arc, + amp_chain_names: Arc, ) -> Self { let logger = logger_factory.component_logger("SubgraphRegistrar", None); let logger_factory = logger_factory.with_parent(logger.clone()); @@ -69,6 +72,7 @@ where version_switching_mode, assignment_event_stream_cancel_guard: CancelGuard::new(), settings, + amp_chain_names, } } @@ -314,6 +318,7 @@ where &resolver, self.amp_client.cheap_clone(), history_blocks, + &self.amp_chain_names, ) .await? } @@ -333,6 +338,7 @@ where &resolver, self.amp_client.cheap_clone(), history_blocks, + &self.amp_chain_names, ) .await? } @@ -460,6 +466,7 @@ async fn create_subgraph_version, amp_client: Option>, history_blocks_override: Option, + amp_chain_names: &AmpChainNames, ) -> Result { let raw_string = serde_yaml::to_string(&raw).unwrap(); @@ -488,9 +495,10 @@ async fn create_subgraph_version(network_name.clone()) + .get::(resolved_name.clone()) .map_err(SubgraphRegistrarError::NetworkNotSupported)? .cheap_clone(); @@ -570,7 +578,7 @@ async fn create_subgraph_version); + +impl AmpChainNames { + pub fn new(mapping: HashMap) -> Self { + AmpChainNames(mapping) + } + + /// Returns the internal chain name for an AMP alias, or the input + /// unchanged if no alias matches. + pub fn resolve(&self, name: &ChainName) -> ChainName { + self.0.get(name).cloned().unwrap_or_else(|| name.clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn amp_chain_names_resolve_known_alias() { + let mut map = HashMap::new(); + map.insert( + ChainName::from("ethereum-mainnet"), + ChainName::from("mainnet"), + ); + let names = AmpChainNames::new(map); + assert_eq!( + names.resolve(&ChainName::from("ethereum-mainnet")), + ChainName::from("mainnet") + ); + } + + #[test] + fn amp_chain_names_resolve_unknown_passthrough() { + let names = AmpChainNames::default(); + assert_eq!( + names.resolve(&ChainName::from("mainnet")), + ChainName::from("mainnet") + ); + } +} diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 69cc0bbe02f..88ba990c1e6 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -45,6 +45,7 @@ ingestor = "index_0" [chains.mainnet] shard = "primary" +amp = "ethereum-mainnet" provider = [ { label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, { label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }}, diff --git a/node/src/config.rs b/node/src/config.rs index 9751844535f..69c174e5a76 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -1,7 +1,7 @@ use graph::{ anyhow::Error, blockchain::BlockchainKind, - components::network_provider::ChainName, + components::network_provider::{AmpChainNames, ChainName}, env::ENV_VARS, firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN}, itertools::Itertools, @@ -23,7 +23,7 @@ use graph_store_postgres::{DeploymentPlacer, Shard as ShardName, PRIMARY_SHARD}; use graph::http::{HeaderMap, Uri}; use serde::Serialize; use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, HashMap}, fmt, }; use std::{fs::read_to_string, time::Duration}; @@ -112,6 +112,24 @@ impl Config { .collect() } + /// Build an `AmpChainNames` mapping from the config. Chains with an + /// explicit `amp` name map that name to the chain name; chains without + /// use identity mapping. + pub fn amp_chain_names(&self) -> AmpChainNames { + let mapping: HashMap = self + .chains + .chains + .iter() + .map(|(chain_name, chain)| { + let amp_name: ChainName = + chain.amp.as_deref().unwrap_or(chain_name.as_str()).into(); + let internal_name: ChainName = chain_name.as_str().into(); + (amp_name, internal_name) + }) + .collect(); + AmpChainNames::new(mapping) + } + /// Check that the config is valid. fn validate(&mut self) -> Result<()> { if !self.stores.contains_key(PRIMARY_SHARD.as_str()) { @@ -430,6 +448,40 @@ impl ChainSection { for (_, chain) in self.chains.iter_mut() { chain.validate()? } + + // Validate that effective AMP names are unique and don't collide + // with other chain names. + let mut amp_names: BTreeMap = BTreeMap::new(); + for (chain_name, chain) in &self.chains { + let effective = chain.amp.as_deref().unwrap_or(chain_name.as_str()); + if let Some(prev_chain) = amp_names.get(effective) { + return Err(anyhow!( + "duplicate AMP name `{}`: used by chains `{}` and `{}`", + effective, + prev_chain, + chain_name + )); + } + // Check that an explicit amp alias doesn't collide with + // another chain's own name (which would be ambiguous). + if chain.amp.is_some() { + if let Some(other) = self.chains.get(effective) { + // Only a collision if the other chain doesn't also + // set the same amp alias (which is covered by the + // duplicate check above). + if other.amp.as_deref() != Some(effective) { + return Err(anyhow!( + "AMP alias `{}` on chain `{}` collides with chain `{}`", + effective, + chain_name, + effective, + )); + } + } + } + amp_names.insert(effective.to_string(), chain_name.clone()); + } + Ok(()) } @@ -523,6 +575,7 @@ impl ChainSection { protocol: BlockchainKind::Ethereum, polling_interval: default_polling_interval(), providers: vec![], + amp: None, }); entry.providers.push(provider); } @@ -543,6 +596,10 @@ pub struct Chain { pub polling_interval: Duration, #[serde(rename = "provider")] pub providers: Vec, + /// AMP network name alias. When set, AMP manifests using this name will + /// resolve to this chain. Defaults to the chain name. + #[serde(default)] + pub amp: Option, } fn default_blockchain_kind() -> BlockchainKind { @@ -1276,6 +1333,7 @@ mod tests { protocol: BlockchainKind::Ethereum, polling_interval: default_polling_interval(), providers: vec![], + amp: None, }, actual ); @@ -1298,6 +1356,7 @@ mod tests { protocol: BlockchainKind::Near, polling_interval: default_polling_interval(), providers: vec![], + amp: None, }, actual ); @@ -1902,4 +1961,133 @@ fdw_pool_size = [ assert_eq!(shard.fdw_pool_size.size_for(&query, "ashard").unwrap(), 5); assert_eq!(shard.fdw_pool_size.size_for(&other, "ashard").unwrap(), 5); } + + #[test] + fn amp_chain_names_parsed_from_toml() { + let actual: Chain = toml::from_str( + r#" + shard = "primary" + provider = [] + amp = "ethereum-mainnet" + "#, + ) + .unwrap(); + + assert_eq!(actual.amp, Some("ethereum-mainnet".to_string())); + } + + #[test] + fn amp_chain_names_default_when_absent() { + let actual: Chain = toml::from_str( + r#" + shard = "primary" + provider = [] + "#, + ) + .unwrap(); + + assert_eq!(actual.amp, None); + } + + #[test] + fn amp_chain_names_validation_rejects_duplicate_effective_names() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + provider = [] + amp = "eth" + [sepolia] + shard = "primary" + provider = [] + amp = "eth" + "#, + ) + .unwrap(); + + let err = section.validate(); + assert!(err.is_err()); + assert!( + err.unwrap_err().to_string().contains("duplicate AMP name"), + "expected duplicate AMP name error" + ); + } + + #[test] + fn amp_chain_names_validation_rejects_alias_colliding_with_chain_name() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + provider = [] + [sepolia] + shard = "primary" + provider = [] + amp = "mainnet" + "#, + ) + .unwrap(); + + let err = section.validate(); + assert!(err.is_err()); + let msg = err.unwrap_err().to_string(); + // The alias "mainnet" on sepolia collides with the chain named + // "mainnet" whose effective AMP name is also "mainnet". + assert!( + msg.contains("duplicate AMP name") || msg.contains("collides with chain"), + "expected collision/duplicate error, got: {msg}" + ); + } + + #[test] + fn amp_chain_names_builds_correct_mapping() { + let section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + provider = [] + amp = "ethereum-mainnet" + [sepolia] + shard = "primary" + provider = [] + "#, + ) + .unwrap(); + + let config = Config { + node: NodeId::new("test").unwrap(), + general: None, + stores: { + let mut s = std::collections::BTreeMap::new(); + s.insert( + "primary".to_string(), + toml::from_str::(r#"connection = "postgresql://u:p@h/db""#).unwrap(), + ); + s + }, + chains: section, + deployment: toml::from_str("[[rule]]\nshards = [\"primary\"]\nindexers = [\"test\"]") + .unwrap(), + }; + + let amp = config.amp_chain_names(); + // Explicit alias resolves + assert_eq!( + amp.resolve(&"ethereum-mainnet".into()), + graph::components::network_provider::ChainName::from("mainnet") + ); + // Identity for chain without alias + assert_eq!( + amp.resolve(&"sepolia".into()), + graph::components::network_provider::ChainName::from("sepolia") + ); + // Unknown name passes through + assert_eq!( + amp.resolve(&"unknown".into()), + graph::components::network_provider::ChainName::from("unknown") + ); + } } diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 9c0bef19e44..06f9b8cf652 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -18,6 +18,7 @@ use graph::url::Url; use graph::{ amp, blockchain::{Blockchain, BlockchainKind, BlockchainMap}, + components::network_provider::AmpChainNames, }; use graph_core::polling_monitor::{arweave_service, ArweaveService, IpfsService}; use graph_graphql::prelude::GraphQlRunner; @@ -276,6 +277,7 @@ fn build_subgraph_registrar( ipfs_service: IpfsService, amp_client: Option>, cancel_token: CancellationToken, + amp_chain_names: Arc, ) -> Arc< graph_core::subgraph::SubgraphRegistrar< graph_core::subgraph_provider::SubgraphProvider, @@ -354,6 +356,7 @@ where node_id.clone(), version_switching_mode, Arc::new(subgraph_settings), + amp_chain_names, )) } @@ -581,6 +584,7 @@ pub async fn run( .await; } + let amp_chain_names = Arc::new(config.amp_chain_names()); let subgraph_registrar = build_subgraph_registrar( metrics_registry.clone(), &network_store, @@ -595,6 +599,7 @@ pub async fn run( ipfs_service, amp_client, cancel_token, + amp_chain_names, ); graph::spawn( diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 4a3a94e92ec..2c23eb5151d 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -209,6 +209,7 @@ pub async fn run( let panicking_subscription_manager = Arc::new(PanicSubscriptionManager {}); + let amp_chain_names = Arc::new(config.amp_chain_names()); let subgraph_registrar = Arc::new(graph_core::subgraph::SubgraphRegistrar::new( &logger_factory, link_resolver.cheap_clone(), @@ -220,6 +221,7 @@ pub async fn run( node_id.clone(), SubgraphVersionSwitchingMode::Instant, Arc::new(Settings::default()), + amp_chain_names, )); let (name, hash) = if subgraph.contains(':') { diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 93953986df5..234890730e5 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -636,6 +636,7 @@ pub async fn setup_inner( node_id.clone(), SubgraphVersionSwitchingMode::Instant, Arc::new(Settings::default()), + Arc::new(graph::components::network_provider::AmpChainNames::default()), )); SubgraphRegistrar::create_subgraph( From 8f0f5f51461e56389e9132422eea08e1d91f18d4 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Mon, 16 Feb 2026 15:22:04 -0800 Subject: [PATCH 2/2] amp: Change naming convention for CTEs The old convention made queries very hard to read. We now simply demand that users stay away from tables etc. whose name starts with `amp_` --- docs/amp-powered-subgraphs.md | 22 ++++++++- .../sql/query_builder/block_range_query.rs | 47 +++++++------------ .../amp/sql/query_builder/context_query.rs | 32 ++++--------- 3 files changed, 48 insertions(+), 53 deletions(-) diff --git a/docs/amp-powered-subgraphs.md b/docs/amp-powered-subgraphs.md index abec0123e88..a93335b835d 100644 --- a/docs/amp-powered-subgraphs.md +++ b/docs/amp-powered-subgraphs.md @@ -41,6 +41,7 @@ The minimum spec version for Amp-powered subgraphs is `1.5.0`. - name: Transfer file: ``` + ### Data source structure @@ -70,6 +71,7 @@ This is used to assign the subgraph to the appropriate indexing process. - name: Transfer file: ``` + ### `name` @@ -97,6 +99,7 @@ This name is used for observability purposes and to identify progress and potent - name: Transfer file: ``` + ### `network` @@ -127,6 +130,7 @@ This is used to validate that the SQL queries for this data source produce resul - name: Transfer file: ``` + ### `source` @@ -158,6 +162,7 @@ This is used to validate that the SQL queries for this data source only query th - name: Transfer file: ``` + ### `source.tables` @@ -185,6 +190,7 @@ This is used to validate that the SQL queries for this data source only query th - name: Transfer file: ``` + ### Optional `source.address` @@ -193,7 +199,7 @@ Contains the contract address with which SQL queries in the data source interact Enables SQL query reuse through `sg_source_address()` calls instead of hard-coding the contract address. SQL queries resolve `sg_source_address()` calls to this contract address. - +
Example YAML: @@ -215,6 +221,7 @@ SQL queries resolve `sg_source_address()` calls to this contract address. - name: Transfer file: ``` +
### Optional `source.startBlock` @@ -245,6 +252,7 @@ _When not provided, defaults to block number `0`._ - name: Transfer file: ``` + ### Optional `source.endBlock` @@ -275,6 +283,7 @@ _When not provided, defaults to the maximum possible block number._ - name: Transfer file: ``` + ### `transformer` @@ -309,6 +318,7 @@ Represents the version of this transformer. Each version may contain a different - name: Transfers file: ``` + ### Optional `transformer.abis` @@ -344,6 +354,7 @@ _When not provided, defaults to an empty list._ - name: Transfer file: ``` + ### `transformer.tables` @@ -368,6 +379,7 @@ type Block @entity(immutable: true) { ``` **YAML manifest:** + ```diff specVersion: 1.5.0 + dataSources: @@ -386,6 +398,7 @@ type Block @entity(immutable: true) { + - name: Block file: ``` + ### `transformer.tables[i].query` @@ -419,6 +432,7 @@ _When not provided, the `file` field is used instead._ - name: Block + query: SELECT * FROM "edgeandnode/ethereum_mainnet".blocks; ``` + ### `transformer.tables[i].file` @@ -451,6 +465,7 @@ _When not provided, the `query` field is used instead._ - name: Block + file: ``` + ### Amp-powered subgraph examples @@ -460,6 +475,11 @@ https://github.com/edgeandnode/amp-subgraph-examples ## SQL query requirements +### Names + +The names of tables, columns, and aliases must not start with `amp_` as this +prefix is reserved for internal use. + ### Block numbers Every SQL query in Amp-powered subgraphs must return the block number for every row. diff --git a/graph/src/amp/sql/query_builder/block_range_query.rs b/graph/src/amp/sql/query_builder/block_range_query.rs index dde44d803ad..496c4a9fbd1 100644 --- a/graph/src/amp/sql/query_builder/block_range_query.rs +++ b/graph/src/amp/sql/query_builder/block_range_query.rs @@ -1,10 +1,8 @@ use std::{ collections::BTreeMap, - hash::{BuildHasher, Hash, Hasher}, ops::{ControlFlow, RangeInclusive}, }; -use ahash::RandomState; use alloy::primitives::BlockNumber; use sqlparser_latest::ast::{self, VisitMut, VisitorMut}; @@ -23,11 +21,8 @@ pub(super) fn new_block_range_query( block_range: &RangeInclusive, ) -> ast::Query { // CTE names are unique within a SQL query. - // The hasher ensures that CTEs created for block range do not collide with user-defined CTEs. - // Constant seeds ensure consistent block range queries for the same input parameters. - let mut hasher = RandomState::with_seeds(0, 0, 0, 0).build_hasher(); - let tables_to_ctes_mapping = new_tables_to_ctes_mapping(query, &mut hasher); + let tables_to_ctes_mapping = new_tables_to_ctes_mapping(query); assert!(!tables_to_ctes_mapping.is_empty()); let mut cte_tables = Vec::with_capacity(tables_to_ctes_mapping.len()); @@ -46,24 +41,18 @@ pub(super) fn new_block_range_query( let block_range_query = format!( "WITH {cte_tables}, {source} AS ({query}) SELECT {source}.* FROM {source} ORDER BY {source}.{block_number_column}", cte_tables = cte_tables.join(", "), - source = format!("source_{}", hasher.finish()) + source = format!("amp_src") ); parse_query(block_range_query).unwrap() } /// Creates unique CTE names for every table referenced by the SQL query. -fn new_tables_to_ctes_mapping( - query: &ast::Query, - hasher: &mut impl Hasher, -) -> BTreeMap { +fn new_tables_to_ctes_mapping(query: &ast::Query) -> BTreeMap { extract_tables(query) .into_iter() - .map(|table| { - table.hash(hasher); - - (table, format!("block_range_{}", hasher.finish())) - }) + .enumerate() + .map(|(idx, table)| (table, format!("amp_br{}", idx + 1))) .collect() } @@ -137,18 +126,18 @@ mod tests { block_range_query.to_string(), parse_query( r#" - WITH block_range_14621009630487609643 AS ( + WITH amp_br1 AS ( SELECT * FROM d WHERE _block_num BETWEEN 0 AND 1000000 ), - source_14621009630487609643 AS ( - SELECT a, b, c FROM block_range_14621009630487609643 AS d + amp_src AS ( + SELECT a, b, c FROM amp_br1 AS d ) SELECT - source_14621009630487609643.* + amp_src.* FROM - source_14621009630487609643 + amp_src ORDER BY - source_14621009630487609643.b + amp_src.b "# ) .unwrap() @@ -167,21 +156,21 @@ mod tests { block_range_query.to_string(), parse_query( r#" - WITH block_range_14621009630487609643 AS ( + WITH amp_br1 AS ( SELECT * FROM d WHERE _block_num BETWEEN 0 AND 1000000 ), - block_range_12377422807768256314 AS ( + amp_br2 AS ( SELECT * FROM e WHERE _block_num BETWEEN 0 AND 1000000 ), - source_12377422807768256314 AS ( - SELECT a, b, c FROM block_range_14621009630487609643 AS d JOIN block_range_12377422807768256314 AS e ON e.e = d.d + amp_src AS ( + SELECT a, b, c FROM amp_br1 AS d JOIN amp_br2 AS e ON e.e = d.d ) SELECT - source_12377422807768256314.* + amp_src.* FROM - source_12377422807768256314 + amp_src ORDER BY - source_12377422807768256314.b + amp_src.b "# ) .unwrap() diff --git a/graph/src/amp/sql/query_builder/context_query.rs b/graph/src/amp/sql/query_builder/context_query.rs index cdff33ca4a3..00cb60a4eac 100644 --- a/graph/src/amp/sql/query_builder/context_query.rs +++ b/graph/src/amp/sql/query_builder/context_query.rs @@ -1,4 +1,3 @@ -use ahash::RandomState; use itertools::Itertools; use sqlparser_latest::ast; @@ -19,16 +18,11 @@ pub(super) fn new_context_query<'a>( context_columns: impl IntoIterator, ) -> ast::Query { // CTE names are unique within a SQL query. - // The hasher ensures that CTEs created for context do not collide with user-defined CTEs. - // Constant seeds ensure consistent context queries for the same input parameters. - let hasher = RandomState::with_seeds(0, 0, 0, 0); - let query_hash = hasher.hash_one(query); - let context_columns = context_columns.into_iter().collect_vec(); assert!(!context_columns.is_empty()); - let context_cte = format!("context_{query_hash}"); - let source_cte = format!("source_{query_hash}"); + let context_cte = "amp_ctx"; + let source_cte = "amp_src"; let context_query = format!( " @@ -81,21 +75,13 @@ mod tests { context_query, parse_query( " - WITH context_10500256449332496249 AS ( - SELECT DISTINCT _block_num, cx_c, cx_d FROM cx_a.cx_b - ), - source_10500256449332496249 AS ( - SELECT a, b, c FROM d - ) - SELECT - context_10500256449332496249.cx_c, - context_10500256449332496249.cx_d, - source_10500256449332496249.* - FROM - source_10500256449332496249 - INNER JOIN context_10500256449332496249 ON - context_10500256449332496249._block_num = source_10500256449332496249.b - " + WITH amp_ctx AS ( + SELECT DISTINCT _block_num, cx_c, cx_d FROM cx_a.cx_b + ), + amp_src AS (SELECT a, b, c FROM d) + SELECT amp_ctx.cx_c, amp_ctx.cx_d, amp_src.* + FROM amp_src + INNER JOIN amp_ctx ON amp_ctx._block_num = amp_src.b" ) .unwrap() )