Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions graph/src/env/mappings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ pub struct EnvVarsMapping {
/// Maximum backoff time for FDS requests. Set by
/// `GRAPH_FDS_MAX_BACKOFF` in seconds, defaults to 600.
pub fds_max_backoff: Duration,

/// Cranelift optimization level for WASM compilation.
///
/// Set by the environment variable `GRAPH_WASM_OPT_LEVEL`. Valid values
/// are `none`, `speed`, and `speed_and_size`. The default value is
/// `speed`.
pub wasm_opt_level: WasmOptLevel,
}

/// Cranelift optimization level for WASM compilation. Maps to
/// `wasmtime::OptLevel` without introducing a dependency on wasmtime.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum WasmOptLevel {
None,
Speed,
SpeedAndSize,
}

// This does not print any values avoid accidentally leaking any sensitive env vars
Expand All @@ -91,6 +107,32 @@ impl fmt::Debug for EnvVarsMapping {
}
}

impl fmt::Display for WasmOptLevel {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
WasmOptLevel::None => write!(f, "none"),
WasmOptLevel::Speed => write!(f, "speed"),
WasmOptLevel::SpeedAndSize => write!(f, "speed_and_size"),
}
}
}

impl FromStr for WasmOptLevel {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"none" => Ok(WasmOptLevel::None),
"speed" => Ok(WasmOptLevel::Speed),
"speed_and_size" => Ok(WasmOptLevel::SpeedAndSize),
_ => Err(format!(
"invalid GRAPH_WASM_OPT_LEVEL '{}', expected 'none', 'speed', or 'speed_and_size'",
s
)),
}
}
}

impl TryFrom<InnerMappingHandlers> for EnvVarsMapping {
type Error = anyhow::Error;

Expand Down Expand Up @@ -121,6 +163,7 @@ impl TryFrom<InnerMappingHandlers> for EnvVarsMapping {
disable_declared_calls: x.disable_declared_calls.0,
store_errors_are_nondeterministic: x.store_errors_are_nondeterministic.0,
fds_max_backoff: Duration::from_secs(x.fds_max_backoff),
wasm_opt_level: x.wasm_opt_level,
};
Ok(vars)
}
Expand Down Expand Up @@ -164,6 +207,8 @@ pub struct InnerMappingHandlers {
store_errors_are_nondeterministic: EnvVarBoolean,
#[envconfig(from = "GRAPH_FDS_MAX_BACKOFF", default = "600")]
fds_max_backoff: u64,
#[envconfig(from = "GRAPH_WASM_OPT_LEVEL", default = "speed")]
wasm_opt_level: WasmOptLevel,
}

fn validate_ipfs_cache_location(path: PathBuf) -> Result<PathBuf, anyhow::Error> {
Expand Down
2 changes: 2 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use semver::Version;

use self::graphql::*;
use self::mappings::*;

pub use self::mappings::WasmOptLevel;
use self::store::*;
use crate::{
components::{store::BlockNumber, subgraph::SubgraphVersionSwitchingMode},
Expand Down
2 changes: 1 addition & 1 deletion graph/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl_asc_type!(u8, u16, u32, u64, i8, i32, i64, f32, f64);
/// 3. Once defined, items and their discriminants cannot be changed, as this would break running
/// subgraphs compiled in previous versions of this representation.
#[repr(u32)]
#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum IndexForAscTypeId {
// Ethereum type IDs
String = 0,
Expand Down
10 changes: 5 additions & 5 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ async fn json_conversions_v0_0_4() {

#[graph::test]
async fn json_conversions_v0_0_5() {
test_json_conversions(API_VERSION_0_0_5, 2289897).await;
test_json_conversions(API_VERSION_0_0_5, 2214813).await;
}

async fn test_json_parsing(api_version: Version, gas_used: u64) {
Expand Down Expand Up @@ -766,7 +766,7 @@ async fn big_int_to_hex_v0_0_4() {

#[graph::test]
async fn big_int_to_hex_v0_0_5() {
test_big_int_to_hex(API_VERSION_0_0_5, 2858580).await;
test_big_int_to_hex(API_VERSION_0_0_5, 2565990).await;
}

async fn test_big_int_arithmetic(api_version: Version, gas_used: u64) {
Expand Down Expand Up @@ -832,7 +832,7 @@ async fn big_int_arithmetic_v0_0_4() {

#[graph::test]
async fn big_int_arithmetic_v0_0_5() {
test_big_int_arithmetic(API_VERSION_0_0_5, 7318364).await;
test_big_int_arithmetic(API_VERSION_0_0_5, 5256825).await;
}

async fn test_abort(api_version: Version, error_msg: &str) {
Expand Down Expand Up @@ -970,7 +970,7 @@ async fn data_source_create_v0_0_4() {

#[graph::test]
async fn data_source_create_v0_0_5() {
test_data_source_create(API_VERSION_0_0_5, 101450079).await;
test_data_source_create(API_VERSION_0_0_5, 101425051).await;
}

async fn test_ens_name_by_hash(api_version: Version) {
Expand Down Expand Up @@ -1827,5 +1827,5 @@ async fn yaml_parsing_v0_0_4() {

#[graph::test]
async fn yaml_parsing_v0_0_5() {
test_yaml_parsing(API_VERSION_0_0_5, 1053955992265).await;
test_yaml_parsing(API_VERSION_0_0_5, 1053946160531).await;
}
2 changes: 1 addition & 1 deletion runtime/test/src/test/abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn abi_array_v0_0_4() {

#[graph::test]
async fn abi_array_v0_0_5() {
test_abi_array(API_VERSION_0_0_5, 1636130).await;
test_abi_array(API_VERSION_0_0_5, 1561046).await;
}

async fn test_abi_subarray(api_version: Version) {
Expand Down
41 changes: 35 additions & 6 deletions runtime/wasm/src/mapping.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::gas_rules::GasRules;
use crate::module::{ExperimentalFeatures, ToAscPtr, WasmInstance};
use crate::module::{ExperimentalFeatures, ToAscPtr, WasmInstance, WasmInstanceData};
use graph::blockchain::{BlockTime, Blockchain, HostFn};
use graph::components::store::SubgraphFork;
use graph::components::subgraph::{MappingError, SharedProofOfIndexing};
use graph::data_source::{MappingTrigger, TriggerWithHandler};
use graph::futures01::sync::mpsc;
use graph::futures01::{Future as _, Stream as _};
use graph::futures03::channel::oneshot::Sender;
use graph::parking_lot::RwLock;
use graph::prelude::*;
use graph::runtime::gas::Gas;
use graph::runtime::IndexForAscTypeId;
use parity_wasm::elements::ExportEntry;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -222,6 +224,11 @@ const GN_START_FUNCTION_NAME: &str = "gn::start";
pub struct ValidModule {
pub module: wasmtime::Module,

/// Pre-linked instance template. Created once at module validation time and reused for every
/// trigger instantiation, avoiding the cost of rebuilding the linker (~60 host function
/// registrations) and resolving imports on each trigger.
pub instance_pre: wasmtime::InstancePre<WasmInstanceData>,

// Due to our internal architecture we don't want to run the start function at instantiation time,
// so we track it separately so that we can run it at an appropriate time.
// Since the start function is not an export, we will also create an export for it.
Expand All @@ -243,6 +250,10 @@ pub struct ValidModule {

// Used as a guard to terminate this task dependency.
epoch_counter_abort_handle: Option<tokio::task::AbortHandle>,

/// Cache for asc_type_id results. Maps IndexForAscTypeId to their WASM runtime
/// type IDs. Populated lazily on first use; deterministic per compiled module.
asc_type_id_cache: RwLock<HashMap<IndexForAscTypeId, u32>>,
}

impl ValidModule {
Expand Down Expand Up @@ -291,14 +302,19 @@ impl ValidModule {
.map_err(|_| anyhow!("Failed to inject gas counter"))?;
let raw_module = parity_module.into_bytes()?;

// We currently use Cranelift as a compilation engine. Cranelift is an optimizing compiler,
// but that should not cause determinism issues since it adheres to the Wasm spec. Still we
// turn off optional optimizations to be conservative.
// We use Cranelift as a compilation engine. Cranelift is an optimizing compiler, but that
// should not cause determinism issues since it adheres to the Wasm spec and NaN
// canonicalization is enabled below. The optimization level is configurable via
// GRAPH_WASM_OPT_LEVEL (default: speed).
let mut config = wasmtime::Config::new();
config.strategy(wasmtime::Strategy::Cranelift);
config.epoch_interruption(true);
config.cranelift_nan_canonicalization(true); // For NaN determinism.
config.cranelift_opt_level(wasmtime::OptLevel::None);
config.cranelift_opt_level(match ENV_VARS.mappings.wasm_opt_level {
graph::env::WasmOptLevel::None => wasmtime::OptLevel::None,
graph::env::WasmOptLevel::Speed => wasmtime::OptLevel::Speed,
graph::env::WasmOptLevel::SpeedAndSize => wasmtime::OptLevel::SpeedAndSize,
});
config.max_wasm_stack(ENV_VARS.mappings.max_stack_size);
config.async_support(true);

Expand Down Expand Up @@ -335,14 +351,27 @@ impl ValidModule {
epoch_counter_abort_handle = Some(graph::spawn(epoch_counter).abort_handle());
}

let linker = crate::module::build_linker(engine, &import_name_to_modules)?;
let instance_pre = linker.instantiate_pre(&module)?;

Ok(ValidModule {
module,
instance_pre,
import_name_to_modules,
start_function,
timeout,
epoch_counter_abort_handle,
asc_type_id_cache: RwLock::new(HashMap::new()),
})
}

pub fn get_cached_type_id(&self, idx: IndexForAscTypeId) -> Option<u32> {
self.asc_type_id_cache.read().get(&idx).copied()
}

pub fn cache_type_id(&self, idx: IndexForAscTypeId, type_id: u32) {
self.asc_type_id_cache.write().insert(idx, type_id);
}
}

impl Drop for ValidModule {
Expand Down
5 changes: 5 additions & 0 deletions runtime/wasm/src/module/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ pub struct WasmInstanceData {
pub valid_module: Arc<ValidModule>,
pub host_metrics: Arc<HostMetrics>,

// Per-trigger gas counter, shared via Arc so clones refer to the same counter.
pub gas: GasCounter,

// A trap ocurred due to a possible reorg detection.
pub possible_reorg: bool,

Expand All @@ -100,13 +103,15 @@ impl WasmInstanceData {
ctx: MappingContext,
valid_module: Arc<ValidModule>,
host_metrics: Arc<HostMetrics>,
gas: GasCounter,
experimental_features: ExperimentalFeatures,
) -> Self {
WasmInstanceData {
asc_heap: None,
ctx,
valid_module,
host_metrics,
gas,
possible_reorg: false,
deterministic_host_trap: false,
experimental_features,
Expand Down
Loading
Loading