From 1637d61731065b5ad5705c159967f213c3cf624d Mon Sep 17 00:00:00 2001 From: Luke Sandberg Date: Sun, 25 Jan 2026 10:22:23 -0800 Subject: [PATCH] [Turbopack] Use a presized scratch buffer for task encoding (#88924) ### What Pass a shared buffer to use as scratch space for encoding TaskStorage values. Also reduce the size of the `CollectorEntryValue` enum (32->24 bytes), our use of a `SmallVec` was inefficient, instead we can store more inline data by doing it ourselves (22 bytes instead of 16) ### Why Currently whenever we encode `TaskStorage` we allocate a new `TurboBincodeBuffer` (aka `SmallVec<[u8;16>`) only the very smallest TaskStorage values fit in that space, so we are always allocating and resizing a buffer for every `TaskStorage` we encode. Using a shared scratch buffer we avoid resizes and allocations during encoding but now always need to copy our data out of the shared buffer. This should reduce temporary allocations and buffer copies from the resizes in the common case. As well as ensure that the buffers we pass through to the collectors are exactly sized I presized it as 4096 bytes since this covers ~98% of tasks we encode, and it is a nice magic number A future optimization could be to accumulate all writes for every SnapshotShard in a single buffer that we pass down to the `Collector`. --- .../crates/turbo-persistence/src/collector.rs | 14 +++- .../turbo-persistence/src/collector_entry.rs | 31 +++++-- .../crates/turbo-persistence/src/value_buf.rs | 16 +--- .../turbo-persistence/src/write_batch.rs | 18 ++++ .../turbo-tasks-backend/src/backend/mod.rs | 83 ++++++++++++------- .../src/backend/storage.rs | 36 ++++++-- .../src/backing_storage.rs | 15 +--- .../src/kv_backing_storage.rs | 40 +-------- 8 files changed, 142 insertions(+), 111 deletions(-) diff --git a/turbopack/crates/turbo-persistence/src/collector.rs b/turbopack/crates/turbo-persistence/src/collector.rs index b955d6102bec14..4cd0c83ad53ad7 100644 --- a/turbopack/crates/turbo-persistence/src/collector.rs +++ b/turbopack/crates/turbo-persistence/src/collector.rs @@ -2,7 +2,7 @@ use std::mem::take; use crate::{ ValueBuffer, - collector_entry::{CollectorEntry, CollectorEntryValue, EntryKey}, + collector_entry::{CollectorEntry, CollectorEntryValue, EntryKey, TINY_VALUE_THRESHOLD}, constants::{ DATA_THRESHOLD_PER_INITIAL_FILE, MAX_ENTRIES_PER_INITIAL_FILE, MAX_SMALL_VALUE_SIZE, }, @@ -47,11 +47,19 @@ impl Collector { }; let value = if value.len() > MAX_SMALL_VALUE_SIZE { CollectorEntryValue::Medium { - value: value.into_vec(), + value: value.into_boxed_slice(), + } + } else if value.len() <= TINY_VALUE_THRESHOLD { + let slice: &[u8] = &value; + let mut arr = [0u8; TINY_VALUE_THRESHOLD]; + arr[..slice.len()].copy_from_slice(slice); + CollectorEntryValue::Tiny { + value: arr, + len: slice.len() as u8, } } else { CollectorEntryValue::Small { - value: value.into_small_vec(), + value: value.into_boxed_slice(), } }; self.total_key_size += key.len(); diff --git a/turbopack/crates/turbo-persistence/src/collector_entry.rs b/turbopack/crates/turbo-persistence/src/collector_entry.rs index 88f49821d3c388..06dd0944524d18 100644 --- a/turbopack/crates/turbo-persistence/src/collector_entry.rs +++ b/turbopack/crates/turbo-persistence/src/collector_entry.rs @@ -1,7 +1,5 @@ use std::cmp::Ordering; -use smallvec::SmallVec; - use crate::{ key::StoreKey, static_sorted_file_builder::{Entry, EntryValue}, @@ -12,16 +10,34 @@ pub struct CollectorEntry { pub value: CollectorEntryValue, } +/// The size threshold for inline storage in CollectorEntryValue, this is the largest value that can +/// be stored inline without inflating the size of the enum +pub const TINY_VALUE_THRESHOLD: usize = 22; + pub enum CollectorEntryValue { - Small { value: SmallVec<[u8; 16]> }, - Medium { value: Vec }, - Large { blob: u32 }, + /// Tiny value stored inline (22 16 bytes, no heap allocation) + Tiny { + value: [u8; TINY_VALUE_THRESHOLD], + len: u8, + }, + /// Small value that fits in shared value blocks (> 16 bytes, ≤ MAX_SMALL_VALUE_SIZE) + Small { + value: Box<[u8]>, + }, + /// Medium value that gets its own value block (> MAX_SMALL_VALUE_SIZE) + Medium { + value: Box<[u8]>, + }, + Large { + blob: u32, + }, Deleted, } impl CollectorEntryValue { pub fn len(&self) -> usize { match self { + CollectorEntryValue::Tiny { len, .. } => *len as usize, CollectorEntryValue::Small { value } => value.len(), CollectorEntryValue::Medium { value } => value.len(), CollectorEntryValue::Large { blob: _ } => 0, @@ -78,6 +94,11 @@ impl Entry for CollectorEntry { fn value(&self) -> EntryValue<'_> { match &self.value { + // Tiny values are stored the same way as Small in the SST file, they just have an + // optimized representation here + CollectorEntryValue::Tiny { value, len } => EntryValue::Small { + value: &value[..*len as usize], + }, CollectorEntryValue::Small { value } => EntryValue::Small { value }, CollectorEntryValue::Medium { value } => EntryValue::Medium { value }, CollectorEntryValue::Large { blob } => EntryValue::Large { blob: *blob }, diff --git a/turbopack/crates/turbo-persistence/src/value_buf.rs b/turbopack/crates/turbo-persistence/src/value_buf.rs index 8e961a60c596d7..cd446fe079749b 100644 --- a/turbopack/crates/turbo-persistence/src/value_buf.rs +++ b/turbopack/crates/turbo-persistence/src/value_buf.rs @@ -9,19 +9,11 @@ pub enum ValueBuffer<'l> { } impl ValueBuffer<'_> { - pub fn into_vec(self) -> Vec { + pub fn into_boxed_slice(self) -> Box<[u8]> { match self { - ValueBuffer::Borrowed(b) => b.to_vec(), - ValueBuffer::Vec(v) => v, - ValueBuffer::SmallVec(sv) => sv.into_vec(), - } - } - - pub fn into_small_vec(self) -> SmallVec<[u8; 16]> { - match self { - ValueBuffer::Borrowed(b) => SmallVec::from_slice(b), - ValueBuffer::Vec(v) => SmallVec::from_vec(v), - ValueBuffer::SmallVec(sv) => sv, + ValueBuffer::Borrowed(b) => b.into(), + ValueBuffer::Vec(v) => v.into_boxed_slice(), + ValueBuffer::SmallVec(sv) => sv.into_vec().into_boxed_slice(), } } } diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 81ca849151b9be..5921ef1bf44ac2 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -175,6 +175,24 @@ impl } } } + // After flushing write all the full global collectors to disk. + // TODO: This can distribute work unfairly + // * a thread could fill up multiple global collectors and then get stuck writing them all + // out, if multiple threads could work on it we could take care of spare IO parallism + // * we can also have too much IO parallism with many threads concurrently writing files. + // + // Ideally we would limit the amount of data buffered in memory and control the amount of IO + // parallism. Consider: + // * store full-buffers as a field on WireBatch (queued writes) + // * each thread will attempt to poll and flush a full buffer after flushing its local + // buffer. + // This will distribute the writing work more fairly, but now we have the problem of to + // many concurrent writes contending for filesystem locks. So we could also use a semaphore + // to restrict how many concurrent writes occur. But then we would accumulate 'fullBuffers' + // leading to too much memory consumption. So really we also need to slow down the threads + // submitting work data. To do this we could simply use a tokio semaphore and make all + // these operations async, or we could integrate with the parallel::map operation that is + // driving the work to slow down task submission in this case. for mut global_collector in full_collectors { // When the global collector is full, we create a new SST file. let sst = self.create_sst_file(family, global_collector.sorted())?; diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index c861afb69c7f42..52559a094322a4 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -16,7 +16,7 @@ use std::{ }, }; -use anyhow::{Result, bail}; +use anyhow::{Context, Result, bail}; use auto_hash_map::{AutoMap, AutoSet}; use indexmap::IndexSet; use parking_lot::{Condvar, Mutex}; @@ -24,6 +24,7 @@ use rustc_hash::{FxHashMap, FxHashSet, FxHasher}; use smallvec::{SmallVec, smallvec}; use tokio::time::{Duration, Instant}; use tracing::{Span, trace_span}; +use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder}; use turbo_tasks::{ CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason, @@ -1017,42 +1018,32 @@ impl TurboTasksBackendInner { (meta, data) }; - let process = - |task_id: TaskId, (meta, data): (Option, Option)| { - // TODO: perf: Instead of returning a `Vec` of individually allocated `SmallVec`s, - // it'd be better to append everything to a flat per-task or - // per-shard `Vec`, and have each `serialize` call return - // `(start_idx, end_idx)`. + let process = |task_id: TaskId, + (meta, data): (Option, Option), + buffer: &mut TurboBincodeBuffer| { + ( + task_id, + meta.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Meta, buffer)), + data.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Data, buffer)), + ) + }; + let process_snapshot = + |task_id: TaskId, inner: Box, buffer: &mut TurboBincodeBuffer| { + if task_id.is_transient() { + return (task_id, None, None); + } + + // Encode meta/data directly from TaskStorage snapshot ( task_id, - meta.map(|d| { - self.backing_storage - .serialize(task_id, &d, SpecificTaskDataCategory::Meta) + inner.flags.meta_modified().then(|| { + encode_task_data(task_id, &inner, SpecificTaskDataCategory::Meta, buffer) }), - data.map(|d| { - self.backing_storage - .serialize(task_id, &d, SpecificTaskDataCategory::Data) + inner.flags.data_modified().then(|| { + encode_task_data(task_id, &inner, SpecificTaskDataCategory::Data, buffer) }), ) }; - let process_snapshot = |task_id: TaskId, inner: Box| { - if task_id.is_transient() { - return (task_id, None, None); - } - - // Encode meta/data directly from TaskStorage snapshot - ( - task_id, - inner.flags.meta_modified().then(|| { - self.backing_storage - .serialize(task_id, &inner, SpecificTaskDataCategory::Meta) - }), - inner.flags.data_modified().then(|| { - self.backing_storage - .serialize(task_id, &inner, SpecificTaskDataCategory::Data) - }), - ) - }; let snapshot = self .storage @@ -3537,3 +3528,33 @@ fn far_future() -> Instant { // 1000 years overflows on macOS, 100 years overflows on FreeBSD. Instant::now() + Duration::from_secs(86400 * 365 * 30) } + +/// Encodes task data, using the provided buffer as a scratch space. Returns a new exactly sized +/// buffer. +/// This allows reusing the buffer across multiple encode calls to optimize allocations and +/// resulting buffer sizes. +fn encode_task_data( + task: TaskId, + data: &TaskStorage, + category: SpecificTaskDataCategory, + scratch_buffer: &mut TurboBincodeBuffer, +) -> Result { + scratch_buffer.clear(); + let mut encoder = new_turbo_bincode_encoder(scratch_buffer); + data.encode(category, &mut encoder)?; + + if cfg!(feature = "verify_serialization") { + TaskStorage::new() + .decode( + category, + &mut new_turbo_bincode_decoder(&scratch_buffer[..]), + ) + .with_context(|| { + format!( + "expected to be able to decode serialized data for '{category:?}' information \ + for {task}" + ) + })?; + } + Ok(SmallVec::from_slice(scratch_buffer)) +} diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index 67a462242f0403..f9578a23fc7284 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -5,6 +5,7 @@ use std::{ }; use smallvec::SmallVec; +use turbo_bincode::TurboBincodeBuffer; use turbo_tasks::{FxDashMap, TaskId, parallel}; use crate::{ @@ -106,13 +107,15 @@ impl Storage { /// the results. Ends snapshot mode afterwards. /// preprocess is potentially called within a lock, so it should be fast. /// process is called outside of locks, so it could do more expensive operations. + /// Both process and process_snapshot receive a mutable scratch buffer that can be reused + /// across iterations to avoid repeated allocations. pub fn take_snapshot< 'l, T, R, PP: for<'a> Fn(TaskId, &'a TaskStorage) -> T + Sync, - P: Fn(TaskId, T) -> R + Sync, - PS: Fn(TaskId, Box) -> R + Sync, + P: Fn(TaskId, T, &mut TurboBincodeBuffer) -> R + Sync, + PS: Fn(TaskId, Box, &mut TurboBincodeBuffer) -> R + Sync, >( &'l self, preprocess: &'l PP, @@ -153,7 +156,9 @@ impl Storage { // Safety: guard must outlive the iterator. drop(guard); } - + /// How big of a buffer to allocate initially. Based on metrics from a large + /// application this should cover about 98% of values with no resizes + const SCRATCH_BUFFER_SIZE: usize = 4096; SnapshotShard { direct_snapshots, modified, @@ -162,6 +167,7 @@ impl Storage { process, preprocess, process_snapshot, + scratch_buffer: TurboBincodeBuffer::with_capacity(SCRATCH_BUFFER_SIZE), } }) } @@ -362,26 +368,36 @@ pub struct SnapshotShard<'l, PP, P, PS> { process: &'l P, preprocess: &'l PP, process_snapshot: &'l PS, + /// Scratch buffer for encoding task data, reused across iterations to avoid allocations + scratch_buffer: TurboBincodeBuffer, } impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS> where PP: for<'a> Fn(TaskId, &'a TaskStorage) -> T + Sync, - P: Fn(TaskId, T) -> R + Sync, - PS: Fn(TaskId, Box) -> R + Sync, + P: Fn(TaskId, T, &mut TurboBincodeBuffer) -> R + Sync, + PS: Fn(TaskId, Box, &mut TurboBincodeBuffer) -> R + Sync, { type Item = R; fn next(&mut self) -> Option { if let Some((task_id, snapshot)) = self.direct_snapshots.pop() { - return Some((self.process_snapshot)(task_id, snapshot)); + return Some((self.process_snapshot)( + task_id, + snapshot, + &mut self.scratch_buffer, + )); } while let Some(task_id) = self.modified.pop() { let inner = self.storage.map.get(&task_id).unwrap(); if !inner.flags.any_snapshot() { let preprocessed = (self.preprocess)(task_id, &inner); drop(inner); - return Some((self.process)(task_id, preprocessed)); + return Some((self.process)( + task_id, + preprocessed, + &mut self.scratch_buffer, + )); } else { drop(inner); let maybe_snapshot = { @@ -392,7 +408,11 @@ where snapshot.take() }; if let Some(snapshot) = maybe_snapshot { - return Some((self.process_snapshot)(task_id, snapshot)); + return Some((self.process_snapshot)( + task_id, + snapshot, + &mut self.scratch_buffer, + )); } } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs index c927ee44018c93..ead13c1e61fa07 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs @@ -43,12 +43,6 @@ pub trait BackingStorageSealed: 'static + Send + Sync { type ReadTransaction<'l>; fn next_free_task_id(&self) -> Result; fn uncompleted_operations(&self) -> Result>; - fn serialize( - &self, - task: TaskId, - data: &TaskStorage, - category: SpecificTaskDataCategory, - ) -> Result>; fn save_snapshot( &self, @@ -126,14 +120,7 @@ where fn uncompleted_operations(&self) -> Result> { either::for_both!(self, this => this.uncompleted_operations()) } - fn serialize( - &self, - task: TaskId, - data: &TaskStorage, - category: SpecificTaskDataCategory, - ) -> Result> { - either::for_both!(self, this => this.serialize(task, data, category)) - } + fn save_snapshot( &self, operations: Vec>, diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index 198079c9abfb23..dd8529dbad3ad7 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -7,8 +7,8 @@ use std::{ use anyhow::{Context, Result}; use turbo_bincode::{ - TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder, turbo_bincode_decode, - turbo_bincode_encode, turbo_bincode_encode_into, + TurboBincodeBuffer, new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode, + turbo_bincode_encode_into, }; use turbo_tasks::{ TaskId, @@ -250,15 +250,6 @@ impl BackingStorageSealed get(&self.inner.database).context("Unable to read uncompleted operations from database") } - fn serialize( - &self, - task: TaskId, - data: &TaskStorage, - category: SpecificTaskDataCategory, - ) -> Result { - encode_task_data(task, data, category) - } - fn save_snapshot( &self, operations: Vec>, @@ -797,30 +788,3 @@ where Ok(result) }) } - -fn encode_task_data( - task: TaskId, - data: &TaskStorage, - category: SpecificTaskDataCategory, -) -> Result { - // TODO: see if the caller can pass us a buffer instead of us allocating a new one. - // This should be possible and save a lot of small allocations. - let mut buffer = TurboBincodeBuffer::new(); - let mut encoder = new_turbo_bincode_encoder(&mut buffer); - data.encode(category, &mut encoder)?; - - if !cfg!(feature = "verify_serialization") { - return Ok(buffer); - } - - TaskStorage::new() - .decode(category, &mut new_turbo_bincode_decoder(buffer.borrow())) - .with_context(|| { - format!( - "expected to be able to decode serialized data for '{category:?}' information for \ - {task}" - ) - })?; - - Ok(buffer) -}