Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use graph::amp;
use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap};
use graph::components::{
link_resolver::LinkResolverContext,
network_provider::AmpChainNames,
store::{DeploymentId, DeploymentLocator, SubscriptionManager},
subgraph::Settings,
};
Expand All @@ -30,6 +31,7 @@ pub struct SubgraphRegistrar<P, S, SM, AC> {
version_switching_mode: SubgraphVersionSwitchingMode,
assignment_event_stream_cancel_guard: CancelGuard, // cancels on drop
settings: Arc<Settings>,
amp_chain_names: Arc<AmpChainNames>,
}

impl<P, S, SM, AC> SubgraphRegistrar<P, S, SM, AC>
Expand All @@ -50,6 +52,7 @@ where
node_id: NodeId,
version_switching_mode: SubgraphVersionSwitchingMode,
settings: Arc<Settings>,
amp_chain_names: Arc<AmpChainNames>,
) -> Self {
let logger = logger_factory.component_logger("SubgraphRegistrar", None);
let logger_factory = logger_factory.with_parent(logger.clone());
Expand All @@ -69,6 +72,7 @@ where
version_switching_mode,
assignment_event_stream_cancel_guard: CancelGuard::new(),
settings,
amp_chain_names,
}
}

Expand Down Expand Up @@ -314,6 +318,7 @@ where
&resolver,
self.amp_client.cheap_clone(),
history_blocks,
&self.amp_chain_names,
)
.await?
}
Expand All @@ -333,6 +338,7 @@ where
&resolver,
self.amp_client.cheap_clone(),
history_blocks,
&self.amp_chain_names,
)
.await?
}
Expand Down Expand Up @@ -460,6 +466,7 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore, AC: amp::Clien
resolver: &Arc<dyn LinkResolver>,
amp_client: Option<Arc<AC>>,
history_blocks_override: Option<i32>,
amp_chain_names: &AmpChainNames,
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
let raw_string = serde_yaml::to_string(&raw).unwrap();

Expand Down Expand Up @@ -488,9 +495,10 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore, AC: amp::Clien
.map_err(SubgraphRegistrarError::ManifestValidationError)?;

let network_name: Word = manifest.network_name().into();
let resolved_name = amp_chain_names.resolve(&network_name);

let chain = chains
.get::<C>(network_name.clone())
.get::<C>(resolved_name.clone())
.map_err(SubgraphRegistrarError::NetworkNotSupported)?
.cheap_clone();

Expand Down Expand Up @@ -570,7 +578,7 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore, AC: amp::Clien
&manifest.schema,
deployment,
node_id,
network_name.into(),
resolved_name.into(),
version_switching_mode,
)
.await
Expand Down
22 changes: 21 additions & 1 deletion docs/amp-powered-subgraphs.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The minimum spec version for Amp-powered subgraphs is `1.5.0`.
- name: Transfer
file: <IPFS CID of the SQL query file>
```

</details>

### Data source structure
Expand Down Expand Up @@ -70,6 +71,7 @@ This is used to assign the subgraph to the appropriate indexing process.
- name: Transfer
file: <IPFS CID of the SQL query file>
```

</details>

### `name`
Expand Down Expand Up @@ -97,6 +99,7 @@ This name is used for observability purposes and to identify progress and potent
- name: Transfer
file: <IPFS CID of the SQL query file>
```

</details>

### `network`
Expand Down Expand Up @@ -127,6 +130,7 @@ This is used to validate that the SQL queries for this data source produce resul
- name: Transfer
file: <IPFS CID of the SQL query file>
```

</details>

### `source`
Expand Down Expand Up @@ -158,6 +162,7 @@ This is used to validate that the SQL queries for this data source only query th
- name: Transfer
file: <IPFS CID of the SQL query file>
```

</details>

### `source.tables`
Expand Down Expand Up @@ -185,6 +190,7 @@ This is used to validate that the SQL queries for this data source only query th
- name: Transfer
file: <IPFS CID of the SQL query file>
```

</details>

### Optional `source.address`
Expand All @@ -193,7 +199,7 @@ Contains the contract address with which SQL queries in the data source interact

Enables SQL query reuse through `sg_source_address()` calls instead of hard-coding the contract address.
SQL queries resolve `sg_source_address()` calls to this contract address.

<details>
<summary>Example YAML:</summary>

Expand All @@ -215,6 +221,7 @@ SQL queries resolve `sg_source_address()` calls to this contract address.
- name: Transfer
file: <IPFS CID of the SQL query file>
```

</details>

### Optional `source.startBlock`
Expand Down Expand Up @@ -245,6 +252,7 @@ _When not provided, defaults to block number `0`._
- name: Transfer
file: <IPFS CID of the SQL query file>
```

</details>

### Optional `source.endBlock`
Expand Down Expand Up @@ -275,6 +283,7 @@ _When not provided, defaults to the maximum possible block number._
- name: Transfer
file: <IPFS CID of the SQL query file>
```

</details>

### `transformer`
Expand Down Expand Up @@ -309,6 +318,7 @@ Represents the version of this transformer. Each version may contain a different
- name: Transfers
file: <IPFS CID of the SQL query file>
```

</details>

### Optional `transformer.abis`
Expand Down Expand Up @@ -344,6 +354,7 @@ _When not provided, defaults to an empty list._
- name: Transfer
file: <IPFS CID of the SQL query file>
```

</details>

### `transformer.tables`
Expand All @@ -368,6 +379,7 @@ type Block @entity(immutable: true) {
```

**YAML manifest:**

```diff
specVersion: 1.5.0
+ dataSources:
Expand All @@ -386,6 +398,7 @@ type Block @entity(immutable: true) {
+ - name: Block
file: <IPFS CID of the SQL query file>
```

</details>

### `transformer.tables[i].query`
Expand Down Expand Up @@ -419,6 +432,7 @@ _When not provided, the `file` field is used instead._
- name: Block
+ query: SELECT * FROM "edgeandnode/ethereum_mainnet".blocks;
```

</details>

### `transformer.tables[i].file`
Expand Down Expand Up @@ -451,6 +465,7 @@ _When not provided, the `query` field is used instead._
- name: Block
+ file: <IPFS CID of the SQL query file>
```

</details>

### Amp-powered subgraph examples
Expand All @@ -460,6 +475,11 @@ https://github.com/edgeandnode/amp-subgraph-examples

## SQL query requirements

### Names

The names of tables, columns, and aliases must not start with `amp_` as this
prefix is reserved for internal use.

### Block numbers

Every SQL query in Amp-powered subgraphs must return the block number for every row.
Expand Down
4 changes: 4 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ The configuration for a chain `name` is specified in the section
- `protocol`: the protocol type being indexed, default `ethereum`
(alternatively `near`, `cosmos`,`arweave`,`starknet`)
- `polling_interval`: the polling interval for the block ingestor (default 500ms)
- `amp`: the network name used by AMP for this chain; defaults to the chain name.
Set this when AMP uses a different name than graph-node (e.g., `amp = "ethereum-mainnet"`
on a chain named `mainnet`).
- `provider`: a list of providers for that chain

A `provider` is an object with the following characteristics:
Expand Down Expand Up @@ -164,6 +167,7 @@ optimisations.
ingestor = "block_ingestor_node"
[chains.mainnet]
shard = "vip"
amp = "ethereum-mainnet"
provider = [
{ label = "mainnet1", url = "http://..", features = [], headers = { Authorization = "Bearer foo" } },
{ label = "mainnet2", url = "http://..", features = [ "archive", "traces" ] }
Expand Down
47 changes: 18 additions & 29 deletions graph/src/amp/sql/query_builder/block_range_query.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::{
collections::BTreeMap,
hash::{BuildHasher, Hash, Hasher},
ops::{ControlFlow, RangeInclusive},
};

use ahash::RandomState;
use alloy::primitives::BlockNumber;
use sqlparser_latest::ast::{self, VisitMut, VisitorMut};

Expand All @@ -23,11 +21,8 @@ pub(super) fn new_block_range_query(
block_range: &RangeInclusive<BlockNumber>,
) -> ast::Query {
// CTE names are unique within a SQL query.
// The hasher ensures that CTEs created for block range do not collide with user-defined CTEs.
// Constant seeds ensure consistent block range queries for the same input parameters.
let mut hasher = RandomState::with_seeds(0, 0, 0, 0).build_hasher();

let tables_to_ctes_mapping = new_tables_to_ctes_mapping(query, &mut hasher);
let tables_to_ctes_mapping = new_tables_to_ctes_mapping(query);
assert!(!tables_to_ctes_mapping.is_empty());

let mut cte_tables = Vec::with_capacity(tables_to_ctes_mapping.len());
Expand All @@ -46,24 +41,18 @@ pub(super) fn new_block_range_query(
let block_range_query = format!(
"WITH {cte_tables}, {source} AS ({query}) SELECT {source}.* FROM {source} ORDER BY {source}.{block_number_column}",
cte_tables = cte_tables.join(", "),
source = format!("source_{}", hasher.finish())
source = format!("amp_src")
);

parse_query(block_range_query).unwrap()
}

/// Creates unique CTE names for every table referenced by the SQL query.
fn new_tables_to_ctes_mapping(
query: &ast::Query,
hasher: &mut impl Hasher,
) -> BTreeMap<TableReference, String> {
fn new_tables_to_ctes_mapping(query: &ast::Query) -> BTreeMap<TableReference, String> {
extract_tables(query)
.into_iter()
.map(|table| {
table.hash(hasher);

(table, format!("block_range_{}", hasher.finish()))
})
.enumerate()
.map(|(idx, table)| (table, format!("amp_br{}", idx + 1)))
.collect()
}

Expand Down Expand Up @@ -137,18 +126,18 @@ mod tests {
block_range_query.to_string(),
parse_query(
r#"
WITH block_range_14621009630487609643 AS (
WITH amp_br1 AS (
SELECT * FROM d WHERE _block_num BETWEEN 0 AND 1000000
),
source_14621009630487609643 AS (
SELECT a, b, c FROM block_range_14621009630487609643 AS d
amp_src AS (
SELECT a, b, c FROM amp_br1 AS d
)
SELECT
source_14621009630487609643.*
amp_src.*
FROM
source_14621009630487609643
amp_src
ORDER BY
source_14621009630487609643.b
amp_src.b
"#
)
.unwrap()
Expand All @@ -167,21 +156,21 @@ mod tests {
block_range_query.to_string(),
parse_query(
r#"
WITH block_range_14621009630487609643 AS (
WITH amp_br1 AS (
SELECT * FROM d WHERE _block_num BETWEEN 0 AND 1000000
),
block_range_12377422807768256314 AS (
amp_br2 AS (
SELECT * FROM e WHERE _block_num BETWEEN 0 AND 1000000
),
source_12377422807768256314 AS (
SELECT a, b, c FROM block_range_14621009630487609643 AS d JOIN block_range_12377422807768256314 AS e ON e.e = d.d
amp_src AS (
SELECT a, b, c FROM amp_br1 AS d JOIN amp_br2 AS e ON e.e = d.d
)
SELECT
source_12377422807768256314.*
amp_src.*
FROM
source_12377422807768256314
amp_src
ORDER BY
source_12377422807768256314.b
amp_src.b
"#
)
.unwrap()
Expand Down
32 changes: 9 additions & 23 deletions graph/src/amp/sql/query_builder/context_query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use ahash::RandomState;
use itertools::Itertools;
use sqlparser_latest::ast;

Expand All @@ -19,16 +18,11 @@ pub(super) fn new_context_query<'a>(
context_columns: impl IntoIterator<Item = &'a str>,
) -> ast::Query {
// CTE names are unique within a SQL query.
// The hasher ensures that CTEs created for context do not collide with user-defined CTEs.
// Constant seeds ensure consistent context queries for the same input parameters.
let hasher = RandomState::with_seeds(0, 0, 0, 0);
let query_hash = hasher.hash_one(query);

let context_columns = context_columns.into_iter().collect_vec();
assert!(!context_columns.is_empty());

let context_cte = format!("context_{query_hash}");
let source_cte = format!("source_{query_hash}");
let context_cte = "amp_ctx";
let source_cte = "amp_src";

let context_query = format!(
"
Expand Down Expand Up @@ -81,21 +75,13 @@ mod tests {
context_query,
parse_query(
"
WITH context_10500256449332496249 AS (
SELECT DISTINCT _block_num, cx_c, cx_d FROM cx_a.cx_b
),
source_10500256449332496249 AS (
SELECT a, b, c FROM d
)
SELECT
context_10500256449332496249.cx_c,
context_10500256449332496249.cx_d,
source_10500256449332496249.*
FROM
source_10500256449332496249
INNER JOIN context_10500256449332496249 ON
context_10500256449332496249._block_num = source_10500256449332496249.b
"
WITH amp_ctx AS (
SELECT DISTINCT _block_num, cx_c, cx_d FROM cx_a.cx_b
),
amp_src AS (SELECT a, b, c FROM d)
SELECT amp_ctx.cx_c, amp_ctx.cx_d, amp_src.*
FROM amp_src
INNER JOIN amp_ctx ON amp_ctx._block_num = amp_src.b"
)
.unwrap()
)
Expand Down
Loading