Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions turbopack/crates/turbo-persistence/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::mem::take;

use crate::{
ValueBuffer,
collector_entry::{CollectorEntry, CollectorEntryValue, EntryKey},
collector_entry::{CollectorEntry, CollectorEntryValue, EntryKey, TINY_VALUE_THRESHOLD},
constants::{
DATA_THRESHOLD_PER_INITIAL_FILE, MAX_ENTRIES_PER_INITIAL_FILE, MAX_SMALL_VALUE_SIZE,
},
Expand Down Expand Up @@ -47,11 +47,19 @@ impl<K: StoreKey, const SIZE_SHIFT: usize> Collector<K, SIZE_SHIFT> {
};
let value = if value.len() > MAX_SMALL_VALUE_SIZE {
CollectorEntryValue::Medium {
value: value.into_vec(),
value: value.into_boxed_slice(),
}
} else if value.len() <= TINY_VALUE_THRESHOLD {
let slice: &[u8] = &value;
let mut arr = [0u8; TINY_VALUE_THRESHOLD];
arr[..slice.len()].copy_from_slice(slice);
CollectorEntryValue::Tiny {
value: arr,
len: slice.len() as u8,
}
} else {
CollectorEntryValue::Small {
value: value.into_small_vec(),
value: value.into_boxed_slice(),
}
};
self.total_key_size += key.len();
Expand Down
31 changes: 26 additions & 5 deletions turbopack/crates/turbo-persistence/src/collector_entry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::cmp::Ordering;

use smallvec::SmallVec;

use crate::{
key::StoreKey,
static_sorted_file_builder::{Entry, EntryValue},
Expand All @@ -12,16 +10,34 @@ pub struct CollectorEntry<K: StoreKey> {
pub value: CollectorEntryValue,
}

/// The size threshold for inline storage in CollectorEntryValue, this is the largest value that can
/// be stored inline without inflating the size of the enum
pub const TINY_VALUE_THRESHOLD: usize = 22;

pub enum CollectorEntryValue {
Small { value: SmallVec<[u8; 16]> },
Medium { value: Vec<u8> },
Large { blob: u32 },
/// Tiny value stored inline (22 16 bytes, no heap allocation)
Tiny {
value: [u8; TINY_VALUE_THRESHOLD],
len: u8,
},
/// Small value that fits in shared value blocks (> 16 bytes, ≤ MAX_SMALL_VALUE_SIZE)
Small {
value: Box<[u8]>,
},
/// Medium value that gets its own value block (> MAX_SMALL_VALUE_SIZE)
Medium {
value: Box<[u8]>,
},
Large {
blob: u32,
},
Deleted,
}

impl CollectorEntryValue {
pub fn len(&self) -> usize {
match self {
CollectorEntryValue::Tiny { len, .. } => *len as usize,
CollectorEntryValue::Small { value } => value.len(),
CollectorEntryValue::Medium { value } => value.len(),
CollectorEntryValue::Large { blob: _ } => 0,
Expand Down Expand Up @@ -78,6 +94,11 @@ impl<K: StoreKey> Entry for CollectorEntry<K> {

fn value(&self) -> EntryValue<'_> {
match &self.value {
// Tiny values are stored the same way as Small in the SST file, they just have an
// optimized representation here
CollectorEntryValue::Tiny { value, len } => EntryValue::Small {
value: &value[..*len as usize],
},
CollectorEntryValue::Small { value } => EntryValue::Small { value },
CollectorEntryValue::Medium { value } => EntryValue::Medium { value },
CollectorEntryValue::Large { blob } => EntryValue::Large { blob: *blob },
Expand Down
16 changes: 4 additions & 12 deletions turbopack/crates/turbo-persistence/src/value_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,11 @@ pub enum ValueBuffer<'l> {
}

impl ValueBuffer<'_> {
pub fn into_vec(self) -> Vec<u8> {
pub fn into_boxed_slice(self) -> Box<[u8]> {
match self {
ValueBuffer::Borrowed(b) => b.to_vec(),
ValueBuffer::Vec(v) => v,
ValueBuffer::SmallVec(sv) => sv.into_vec(),
}
}

pub fn into_small_vec(self) -> SmallVec<[u8; 16]> {
match self {
ValueBuffer::Borrowed(b) => SmallVec::from_slice(b),
ValueBuffer::Vec(v) => SmallVec::from_vec(v),
ValueBuffer::SmallVec(sv) => sv,
ValueBuffer::Borrowed(b) => b.into(),
ValueBuffer::Vec(v) => v.into_boxed_slice(),
ValueBuffer::SmallVec(sv) => sv.into_vec().into_boxed_slice(),
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions turbopack/crates/turbo-persistence/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,24 @@ impl<K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>
}
}
}
// After flushing write all the full global collectors to disk.
// TODO: This can distribute work unfairly
// * a thread could fill up multiple global collectors and then get stuck writing them all
// out, if multiple threads could work on it we could take care of spare IO parallism
// * we can also have too much IO parallism with many threads concurrently writing files.
//
// Ideally we would limit the amount of data buffered in memory and control the amount of IO
// parallism. Consider:
// * store full-buffers as a field on WireBatch (queued writes)
// * each thread will attempt to poll and flush a full buffer after flushing its local
// buffer.
// This will distribute the writing work more fairly, but now we have the problem of to
// many concurrent writes contending for filesystem locks. So we could also use a semaphore
// to restrict how many concurrent writes occur. But then we would accumulate 'fullBuffers'
// leading to too much memory consumption. So really we also need to slow down the threads
// submitting work data. To do this we could simply use a tokio semaphore and make all
// these operations async, or we could integrate with the parallel::map operation that is
// driving the work to slow down task submission in this case.
for mut global_collector in full_collectors {
// When the global collector is full, we create a new SST file.
let sst = self.create_sst_file(family, global_collector.sorted())?;
Expand Down
83 changes: 52 additions & 31 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ use std::{
},
};

use anyhow::{Result, bail};
use anyhow::{Context, Result, bail};
use auto_hash_map::{AutoMap, AutoSet};
use indexmap::IndexSet;
use parking_lot::{Condvar, Mutex};
use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
use smallvec::{SmallVec, smallvec};
use tokio::time::{Duration, Instant};
use tracing::{Span, trace_span};
use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder};
use turbo_tasks::{
CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency,
ReadOutputOptions, ReadTracking, SharedReference, TRANSIENT_TASK_BIT, TaskExecutionReason,
Expand Down Expand Up @@ -1017,42 +1018,32 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

(meta, data)
};
let process =
|task_id: TaskId, (meta, data): (Option<TaskStorage>, Option<TaskStorage>)| {
// TODO: perf: Instead of returning a `Vec` of individually allocated `SmallVec`s,
// it'd be better to append everything to a flat per-task or
// per-shard `Vec<u8>`, and have each `serialize` call return
// `(start_idx, end_idx)`.
let process = |task_id: TaskId,
(meta, data): (Option<TaskStorage>, Option<TaskStorage>),
buffer: &mut TurboBincodeBuffer| {
(
task_id,
meta.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Meta, buffer)),
data.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Data, buffer)),
)
};
let process_snapshot =
|task_id: TaskId, inner: Box<TaskStorage>, buffer: &mut TurboBincodeBuffer| {
if task_id.is_transient() {
return (task_id, None, None);
}

// Encode meta/data directly from TaskStorage snapshot
(
task_id,
meta.map(|d| {
self.backing_storage
.serialize(task_id, &d, SpecificTaskDataCategory::Meta)
inner.flags.meta_modified().then(|| {
encode_task_data(task_id, &inner, SpecificTaskDataCategory::Meta, buffer)
}),
data.map(|d| {
self.backing_storage
.serialize(task_id, &d, SpecificTaskDataCategory::Data)
inner.flags.data_modified().then(|| {
encode_task_data(task_id, &inner, SpecificTaskDataCategory::Data, buffer)
}),
)
};
let process_snapshot = |task_id: TaskId, inner: Box<TaskStorage>| {
if task_id.is_transient() {
return (task_id, None, None);
}

// Encode meta/data directly from TaskStorage snapshot
(
task_id,
inner.flags.meta_modified().then(|| {
self.backing_storage
.serialize(task_id, &inner, SpecificTaskDataCategory::Meta)
}),
inner.flags.data_modified().then(|| {
self.backing_storage
.serialize(task_id, &inner, SpecificTaskDataCategory::Data)
}),
)
};

let snapshot = self
.storage
Expand Down Expand Up @@ -3537,3 +3528,33 @@ fn far_future() -> Instant {
// 1000 years overflows on macOS, 100 years overflows on FreeBSD.
Instant::now() + Duration::from_secs(86400 * 365 * 30)
}

/// Encodes task data, using the provided buffer as a scratch space. Returns a new exactly sized
/// buffer.
/// This allows reusing the buffer across multiple encode calls to optimize allocations and
/// resulting buffer sizes.
fn encode_task_data(
task: TaskId,
data: &TaskStorage,
category: SpecificTaskDataCategory,
scratch_buffer: &mut TurboBincodeBuffer,
) -> Result<TurboBincodeBuffer> {
scratch_buffer.clear();
let mut encoder = new_turbo_bincode_encoder(scratch_buffer);
data.encode(category, &mut encoder)?;

if cfg!(feature = "verify_serialization") {
TaskStorage::new()
.decode(
category,
&mut new_turbo_bincode_decoder(&scratch_buffer[..]),
)
.with_context(|| {
format!(
"expected to be able to decode serialized data for '{category:?}' information \
for {task}"
)
})?;
}
Ok(SmallVec::from_slice(scratch_buffer))
}
36 changes: 28 additions & 8 deletions turbopack/crates/turbo-tasks-backend/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
};

use smallvec::SmallVec;
use turbo_bincode::TurboBincodeBuffer;
use turbo_tasks::{FxDashMap, TaskId, parallel};

use crate::{
Expand Down Expand Up @@ -106,13 +107,15 @@ impl Storage {
/// the results. Ends snapshot mode afterwards.
/// preprocess is potentially called within a lock, so it should be fast.
/// process is called outside of locks, so it could do more expensive operations.
/// Both process and process_snapshot receive a mutable scratch buffer that can be reused
/// across iterations to avoid repeated allocations.
pub fn take_snapshot<
'l,
T,
R,
PP: for<'a> Fn(TaskId, &'a TaskStorage) -> T + Sync,
P: Fn(TaskId, T) -> R + Sync,
PS: Fn(TaskId, Box<TaskStorage>) -> R + Sync,
P: Fn(TaskId, T, &mut TurboBincodeBuffer) -> R + Sync,
PS: Fn(TaskId, Box<TaskStorage>, &mut TurboBincodeBuffer) -> R + Sync,
>(
&'l self,
preprocess: &'l PP,
Expand Down Expand Up @@ -153,7 +156,9 @@ impl Storage {
// Safety: guard must outlive the iterator.
drop(guard);
}

/// How big of a buffer to allocate initially. Based on metrics from a large
/// application this should cover about 98% of values with no resizes
const SCRATCH_BUFFER_SIZE: usize = 4096;
SnapshotShard {
direct_snapshots,
modified,
Expand All @@ -162,6 +167,7 @@ impl Storage {
process,
preprocess,
process_snapshot,
scratch_buffer: TurboBincodeBuffer::with_capacity(SCRATCH_BUFFER_SIZE),
}
})
}
Expand Down Expand Up @@ -362,26 +368,36 @@ pub struct SnapshotShard<'l, PP, P, PS> {
process: &'l P,
preprocess: &'l PP,
process_snapshot: &'l PS,
/// Scratch buffer for encoding task data, reused across iterations to avoid allocations
scratch_buffer: TurboBincodeBuffer,
}

impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS>
where
PP: for<'a> Fn(TaskId, &'a TaskStorage) -> T + Sync,
P: Fn(TaskId, T) -> R + Sync,
PS: Fn(TaskId, Box<TaskStorage>) -> R + Sync,
P: Fn(TaskId, T, &mut TurboBincodeBuffer) -> R + Sync,
PS: Fn(TaskId, Box<TaskStorage>, &mut TurboBincodeBuffer) -> R + Sync,
{
type Item = R;

fn next(&mut self) -> Option<Self::Item> {
if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
return Some((self.process_snapshot)(task_id, snapshot));
return Some((self.process_snapshot)(
task_id,
snapshot,
&mut self.scratch_buffer,
));
}
while let Some(task_id) = self.modified.pop() {
let inner = self.storage.map.get(&task_id).unwrap();
if !inner.flags.any_snapshot() {
let preprocessed = (self.preprocess)(task_id, &inner);
drop(inner);
return Some((self.process)(task_id, preprocessed));
return Some((self.process)(
task_id,
preprocessed,
&mut self.scratch_buffer,
));
} else {
drop(inner);
let maybe_snapshot = {
Expand All @@ -392,7 +408,11 @@ where
snapshot.take()
};
if let Some(snapshot) = maybe_snapshot {
return Some((self.process_snapshot)(task_id, snapshot));
return Some((self.process_snapshot)(
task_id,
snapshot,
&mut self.scratch_buffer,
));
}
}
}
Expand Down
15 changes: 1 addition & 14 deletions turbopack/crates/turbo-tasks-backend/src/backing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ pub trait BackingStorageSealed: 'static + Send + Sync {
type ReadTransaction<'l>;
fn next_free_task_id(&self) -> Result<TaskId>;
fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>>;
fn serialize(
&self,
task: TaskId,
data: &TaskStorage,
category: SpecificTaskDataCategory,
) -> Result<SmallVec<[u8; 16]>>;

fn save_snapshot<I>(
&self,
Expand Down Expand Up @@ -126,14 +120,7 @@ where
fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>> {
either::for_both!(self, this => this.uncompleted_operations())
}
fn serialize(
&self,
task: TaskId,
data: &TaskStorage,
category: SpecificTaskDataCategory,
) -> Result<SmallVec<[u8; 16]>> {
either::for_both!(self, this => this.serialize(task, data, category))
}

fn save_snapshot<I>(
&self,
operations: Vec<Arc<AnyOperation>>,
Expand Down
Loading
Loading