diff --git a/turbopack/crates/turbo-persistence/src/collector.rs b/turbopack/crates/turbo-persistence/src/collector.rs index b955d6102bec14..4cd0c83ad53ad7 100644 --- a/turbopack/crates/turbo-persistence/src/collector.rs +++ b/turbopack/crates/turbo-persistence/src/collector.rs @@ -2,7 +2,7 @@ use std::mem::take; use crate::{ ValueBuffer, - collector_entry::{CollectorEntry, CollectorEntryValue, EntryKey}, + collector_entry::{CollectorEntry, CollectorEntryValue, EntryKey, TINY_VALUE_THRESHOLD}, constants::{ DATA_THRESHOLD_PER_INITIAL_FILE, MAX_ENTRIES_PER_INITIAL_FILE, MAX_SMALL_VALUE_SIZE, }, @@ -47,11 +47,19 @@ impl Collector { }; let value = if value.len() > MAX_SMALL_VALUE_SIZE { CollectorEntryValue::Medium { - value: value.into_vec(), + value: value.into_boxed_slice(), + } + } else if value.len() <= TINY_VALUE_THRESHOLD { + let slice: &[u8] = &value; + let mut arr = [0u8; TINY_VALUE_THRESHOLD]; + arr[..slice.len()].copy_from_slice(slice); + CollectorEntryValue::Tiny { + value: arr, + len: slice.len() as u8, } } else { CollectorEntryValue::Small { - value: value.into_small_vec(), + value: value.into_boxed_slice(), } }; self.total_key_size += key.len(); diff --git a/turbopack/crates/turbo-persistence/src/collector_entry.rs b/turbopack/crates/turbo-persistence/src/collector_entry.rs index 88f49821d3c388..06dd0944524d18 100644 --- a/turbopack/crates/turbo-persistence/src/collector_entry.rs +++ b/turbopack/crates/turbo-persistence/src/collector_entry.rs @@ -1,7 +1,5 @@ use std::cmp::Ordering; -use smallvec::SmallVec; - use crate::{ key::StoreKey, static_sorted_file_builder::{Entry, EntryValue}, @@ -12,16 +10,34 @@ pub struct CollectorEntry { pub value: CollectorEntryValue, } +/// The size threshold for inline storage in CollectorEntryValue, this is the largest value that can +/// be stored inline without inflating the size of the enum +pub const TINY_VALUE_THRESHOLD: usize = 22; + pub enum CollectorEntryValue { - Small { value: SmallVec<[u8; 16]> }, - Medium { value: Vec }, - Large { blob: u32 }, + /// Tiny value stored inline (22 16 bytes, no heap allocation) + Tiny { + value: [u8; TINY_VALUE_THRESHOLD], + len: u8, + }, + /// Small value that fits in shared value blocks (> 16 bytes, ≤ MAX_SMALL_VALUE_SIZE) + Small { + value: Box<[u8]>, + }, + /// Medium value that gets its own value block (> MAX_SMALL_VALUE_SIZE) + Medium { + value: Box<[u8]>, + }, + Large { + blob: u32, + }, Deleted, } impl CollectorEntryValue { pub fn len(&self) -> usize { match self { + CollectorEntryValue::Tiny { len, .. } => *len as usize, CollectorEntryValue::Small { value } => value.len(), CollectorEntryValue::Medium { value } => value.len(), CollectorEntryValue::Large { blob: _ } => 0, @@ -78,6 +94,11 @@ impl Entry for CollectorEntry { fn value(&self) -> EntryValue<'_> { match &self.value { + // Tiny values are stored the same way as Small in the SST file, they just have an + // optimized representation here + CollectorEntryValue::Tiny { value, len } => EntryValue::Small { + value: &value[..*len as usize], + }, CollectorEntryValue::Small { value } => EntryValue::Small { value }, CollectorEntryValue::Medium { value } => EntryValue::Medium { value }, CollectorEntryValue::Large { blob } => EntryValue::Large { blob: *blob }, diff --git a/turbopack/crates/turbo-persistence/src/value_buf.rs b/turbopack/crates/turbo-persistence/src/value_buf.rs index 8e961a60c596d7..cd446fe079749b 100644 --- a/turbopack/crates/turbo-persistence/src/value_buf.rs +++ b/turbopack/crates/turbo-persistence/src/value_buf.rs @@ -9,19 +9,11 @@ pub enum ValueBuffer<'l> { } impl ValueBuffer<'_> { - pub fn into_vec(self) -> Vec { + pub fn into_boxed_slice(self) -> Box<[u8]> { match self { - ValueBuffer::Borrowed(b) => b.to_vec(), - ValueBuffer::Vec(v) => v, - ValueBuffer::SmallVec(sv) => sv.into_vec(), - } - } - - pub fn into_small_vec(self) -> SmallVec<[u8; 16]> { - match self { - ValueBuffer::Borrowed(b) => SmallVec::from_slice(b), - ValueBuffer::Vec(v) => SmallVec::from_vec(v), - ValueBuffer::SmallVec(sv) => sv, + ValueBuffer::Borrowed(b) => b.into(), + ValueBuffer::Vec(v) => v.into_boxed_slice(), + ValueBuffer::SmallVec(sv) => sv.into_vec().into_boxed_slice(), } } } diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 81ca849151b9be..5921ef1bf44ac2 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -175,6 +175,24 @@ impl } } } + // After flushing write all the full global collectors to disk. + // TODO: This can distribute work unfairly + // * a thread could fill up multiple global collectors and then get stuck writing them all + // out, if multiple threads could work on it we could take care of spare IO parallism + // * we can also have too much IO parallism with many threads concurrently writing files. + // + // Ideally we would limit the amount of data buffered in memory and control the amount of IO + // parallism. Consider: + // * store full-buffers as a field on WireBatch (queued writes) + // * each thread will attempt to poll and flush a full buffer after flushing its local + // buffer. + // This will distribute the writing work more fairly, but now we have the problem of to + // many concurrent writes contending for filesystem locks. So we could also use a semaphore + // to restrict how many concurrent writes occur. But then we would accumulate 'fullBuffers' + // leading to too much memory consumption. So really we also need to slow down the threads + // submitting work data. To do this we could simply use a tokio semaphore and make all + // these operations async, or we could integrate with the parallel::map operation that is + // driving the work to slow down task submission in this case. for mut global_collector in full_collectors { // When the global collector is full, we create a new SST file. let sst = self.create_sst_file(family, global_collector.sorted())?; diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index c861afb69c7f42..52559a094322a4 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -16,7 +16,7 @@ use std::{ }, }; -use anyhow::{Result, bail}; +use anyhow::{Context, Result, bail}; use auto_hash_map::{AutoMap, AutoSet}; use indexmap::IndexSet; use parking_lot::{Condvar, Mutex}; @@ -24,6 +24,7 @@ use rustc_hash::{FxHashMap, FxHashSet, FxHasher}; use smallvec::{SmallVec, smallvec}; use tokio::time::{Duration, Instant}; use tracing::{Span, trace_span}; +use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder}; use turbo_tasks::{ CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason, @@ -1017,42 +1018,32 @@ impl TurboTasksBackendInner { (meta, data) }; - let process = - |task_id: TaskId, (meta, data): (Option, Option)| { - // TODO: perf: Instead of returning a `Vec` of individually allocated `SmallVec`s, - // it'd be better to append everything to a flat per-task or - // per-shard `Vec`, and have each `serialize` call return - // `(start_idx, end_idx)`. + let process = |task_id: TaskId, + (meta, data): (Option, Option), + buffer: &mut TurboBincodeBuffer| { + ( + task_id, + meta.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Meta, buffer)), + data.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Data, buffer)), + ) + }; + let process_snapshot = + |task_id: TaskId, inner: Box, buffer: &mut TurboBincodeBuffer| { + if task_id.is_transient() { + return (task_id, None, None); + } + + // Encode meta/data directly from TaskStorage snapshot ( task_id, - meta.map(|d| { - self.backing_storage - .serialize(task_id, &d, SpecificTaskDataCategory::Meta) + inner.flags.meta_modified().then(|| { + encode_task_data(task_id, &inner, SpecificTaskDataCategory::Meta, buffer) }), - data.map(|d| { - self.backing_storage - .serialize(task_id, &d, SpecificTaskDataCategory::Data) + inner.flags.data_modified().then(|| { + encode_task_data(task_id, &inner, SpecificTaskDataCategory::Data, buffer) }), ) }; - let process_snapshot = |task_id: TaskId, inner: Box| { - if task_id.is_transient() { - return (task_id, None, None); - } - - // Encode meta/data directly from TaskStorage snapshot - ( - task_id, - inner.flags.meta_modified().then(|| { - self.backing_storage - .serialize(task_id, &inner, SpecificTaskDataCategory::Meta) - }), - inner.flags.data_modified().then(|| { - self.backing_storage - .serialize(task_id, &inner, SpecificTaskDataCategory::Data) - }), - ) - }; let snapshot = self .storage @@ -3537,3 +3528,33 @@ fn far_future() -> Instant { // 1000 years overflows on macOS, 100 years overflows on FreeBSD. Instant::now() + Duration::from_secs(86400 * 365 * 30) } + +/// Encodes task data, using the provided buffer as a scratch space. Returns a new exactly sized +/// buffer. +/// This allows reusing the buffer across multiple encode calls to optimize allocations and +/// resulting buffer sizes. +fn encode_task_data( + task: TaskId, + data: &TaskStorage, + category: SpecificTaskDataCategory, + scratch_buffer: &mut TurboBincodeBuffer, +) -> Result { + scratch_buffer.clear(); + let mut encoder = new_turbo_bincode_encoder(scratch_buffer); + data.encode(category, &mut encoder)?; + + if cfg!(feature = "verify_serialization") { + TaskStorage::new() + .decode( + category, + &mut new_turbo_bincode_decoder(&scratch_buffer[..]), + ) + .with_context(|| { + format!( + "expected to be able to decode serialized data for '{category:?}' information \ + for {task}" + ) + })?; + } + Ok(SmallVec::from_slice(scratch_buffer)) +} diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index 67a462242f0403..f9578a23fc7284 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -5,6 +5,7 @@ use std::{ }; use smallvec::SmallVec; +use turbo_bincode::TurboBincodeBuffer; use turbo_tasks::{FxDashMap, TaskId, parallel}; use crate::{ @@ -106,13 +107,15 @@ impl Storage { /// the results. Ends snapshot mode afterwards. /// preprocess is potentially called within a lock, so it should be fast. /// process is called outside of locks, so it could do more expensive operations. + /// Both process and process_snapshot receive a mutable scratch buffer that can be reused + /// across iterations to avoid repeated allocations. pub fn take_snapshot< 'l, T, R, PP: for<'a> Fn(TaskId, &'a TaskStorage) -> T + Sync, - P: Fn(TaskId, T) -> R + Sync, - PS: Fn(TaskId, Box) -> R + Sync, + P: Fn(TaskId, T, &mut TurboBincodeBuffer) -> R + Sync, + PS: Fn(TaskId, Box, &mut TurboBincodeBuffer) -> R + Sync, >( &'l self, preprocess: &'l PP, @@ -153,7 +156,9 @@ impl Storage { // Safety: guard must outlive the iterator. drop(guard); } - + /// How big of a buffer to allocate initially. Based on metrics from a large + /// application this should cover about 98% of values with no resizes + const SCRATCH_BUFFER_SIZE: usize = 4096; SnapshotShard { direct_snapshots, modified, @@ -162,6 +167,7 @@ impl Storage { process, preprocess, process_snapshot, + scratch_buffer: TurboBincodeBuffer::with_capacity(SCRATCH_BUFFER_SIZE), } }) } @@ -362,26 +368,36 @@ pub struct SnapshotShard<'l, PP, P, PS> { process: &'l P, preprocess: &'l PP, process_snapshot: &'l PS, + /// Scratch buffer for encoding task data, reused across iterations to avoid allocations + scratch_buffer: TurboBincodeBuffer, } impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS> where PP: for<'a> Fn(TaskId, &'a TaskStorage) -> T + Sync, - P: Fn(TaskId, T) -> R + Sync, - PS: Fn(TaskId, Box) -> R + Sync, + P: Fn(TaskId, T, &mut TurboBincodeBuffer) -> R + Sync, + PS: Fn(TaskId, Box, &mut TurboBincodeBuffer) -> R + Sync, { type Item = R; fn next(&mut self) -> Option { if let Some((task_id, snapshot)) = self.direct_snapshots.pop() { - return Some((self.process_snapshot)(task_id, snapshot)); + return Some((self.process_snapshot)( + task_id, + snapshot, + &mut self.scratch_buffer, + )); } while let Some(task_id) = self.modified.pop() { let inner = self.storage.map.get(&task_id).unwrap(); if !inner.flags.any_snapshot() { let preprocessed = (self.preprocess)(task_id, &inner); drop(inner); - return Some((self.process)(task_id, preprocessed)); + return Some((self.process)( + task_id, + preprocessed, + &mut self.scratch_buffer, + )); } else { drop(inner); let maybe_snapshot = { @@ -392,7 +408,11 @@ where snapshot.take() }; if let Some(snapshot) = maybe_snapshot { - return Some((self.process_snapshot)(task_id, snapshot)); + return Some((self.process_snapshot)( + task_id, + snapshot, + &mut self.scratch_buffer, + )); } } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs index c927ee44018c93..ead13c1e61fa07 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs @@ -43,12 +43,6 @@ pub trait BackingStorageSealed: 'static + Send + Sync { type ReadTransaction<'l>; fn next_free_task_id(&self) -> Result; fn uncompleted_operations(&self) -> Result>; - fn serialize( - &self, - task: TaskId, - data: &TaskStorage, - category: SpecificTaskDataCategory, - ) -> Result>; fn save_snapshot( &self, @@ -126,14 +120,7 @@ where fn uncompleted_operations(&self) -> Result> { either::for_both!(self, this => this.uncompleted_operations()) } - fn serialize( - &self, - task: TaskId, - data: &TaskStorage, - category: SpecificTaskDataCategory, - ) -> Result> { - either::for_both!(self, this => this.serialize(task, data, category)) - } + fn save_snapshot( &self, operations: Vec>, diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index 198079c9abfb23..dd8529dbad3ad7 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -7,8 +7,8 @@ use std::{ use anyhow::{Context, Result}; use turbo_bincode::{ - TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder, turbo_bincode_decode, - turbo_bincode_encode, turbo_bincode_encode_into, + TurboBincodeBuffer, new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode, + turbo_bincode_encode_into, }; use turbo_tasks::{ TaskId, @@ -250,15 +250,6 @@ impl BackingStorageSealed get(&self.inner.database).context("Unable to read uncompleted operations from database") } - fn serialize( - &self, - task: TaskId, - data: &TaskStorage, - category: SpecificTaskDataCategory, - ) -> Result { - encode_task_data(task, data, category) - } - fn save_snapshot( &self, operations: Vec>, @@ -797,30 +788,3 @@ where Ok(result) }) } - -fn encode_task_data( - task: TaskId, - data: &TaskStorage, - category: SpecificTaskDataCategory, -) -> Result { - // TODO: see if the caller can pass us a buffer instead of us allocating a new one. - // This should be possible and save a lot of small allocations. - let mut buffer = TurboBincodeBuffer::new(); - let mut encoder = new_turbo_bincode_encoder(&mut buffer); - data.encode(category, &mut encoder)?; - - if !cfg!(feature = "verify_serialization") { - return Ok(buffer); - } - - TaskStorage::new() - .decode(category, &mut new_turbo_bincode_decoder(buffer.borrow())) - .with_context(|| { - format!( - "expected to be able to decode serialized data for '{category:?}' information for \ - {task}" - ) - })?; - - Ok(buffer) -}