Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions turbopack/crates/turbo-tasks-backend/src/backend/counter_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,7 @@ impl<K, V> CounterMap<K, V> {
{
self.0.get(key)
}
// TODO(lukesandberg): this is just here for the CachedDataItem adaptor layer, can be removed
// once that is gone.
#[doc(hidden)]
pub fn get_mut(&mut self, _key: &K) -> Option<&mut V>
where
K: Eq + Hash,
{
unreachable!("This should never be called, please insert instead")
}

pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
self.0.iter()
}
Expand Down
600 changes: 259 additions & 341 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@ use turbo_tasks::TaskId;

use crate::{
backend::{
TaskDataCategory, get, get_many,
TaskDataCategory,
operation::{
AggregatedDataUpdate, ExecuteContext, Operation,
aggregation_update::{
AggregationUpdateJob, AggregationUpdateQueue, InnerOfUppersLostFollowersJob,
get_aggregation_number, get_uppers, is_aggregating_node,
},
},
storage::update_count,
storage_schema::TaskStorageAccessors,
},
data::{CachedDataItemKey, CellRef, CollectibleRef, CollectiblesRef},
data::{CellRef, CollectibleRef, CollectiblesRef},
};

#[derive(Encode, Decode, Clone, Default)]
Expand Down Expand Up @@ -50,7 +49,7 @@ impl CleanupOldEdgesOperation {
task_id: TaskId,
outdated: Vec<OutdatedEdge>,
queue: AggregationUpdateQueue,
ctx: &mut impl ExecuteContext,
ctx: &mut impl ExecuteContext<'_>,
) {
CleanupOldEdgesOperation::RemoveEdges {
task_id,
Expand All @@ -62,7 +61,7 @@ impl CleanupOldEdgesOperation {
}

impl Operation for CleanupOldEdgesOperation {
fn execute(mut self, ctx: &mut impl ExecuteContext) {
fn execute(mut self, ctx: &mut impl ExecuteContext<'_>) {
loop {
ctx.operation_suspend_point(&self);
match self {
Expand All @@ -84,8 +83,8 @@ impl Operation for CleanupOldEdgesOperation {
_ => true,
});
let mut task = ctx.task(task_id, TaskDataCategory::All);
for &child_id in children.iter() {
task.remove(&CachedDataItemKey::Child { task: child_id });
for task_id in children.iter() {
task.remove_children(task_id);
}
if is_aggregating_node(get_aggregation_number(&task)) {
queue.push(AggregationUpdateJob::InnerOfUpperLostFollowers {
Expand All @@ -96,7 +95,8 @@ impl Operation for CleanupOldEdgesOperation {
} else {
let upper_ids = get_uppers(&task);
let has_active_count = ctx.should_track_activeness()
&& get!(task, Activeness)
&& task
.get_activeness()
.is_some_and(|a| a.active_counter > 0);
drop(task);
if has_active_count {
Expand Down Expand Up @@ -127,19 +127,20 @@ impl Operation for CleanupOldEdgesOperation {
let mut task = ctx.task(task_id, TaskDataCategory::All);
let mut emptied_collectables = FxHashSet::default();
for (collectible, count) in collectibles.iter_mut() {
if update_count!(
task,
Collectible {
collectible: *collectible
},
*count
) {
if task
.update_collectibles_positive_crossing(*collectible, *count)
{
emptied_collectables.insert(collectible.collectible_type);
}
}

for ty in emptied_collectables {
let task_ids = get_many!(task, CollectiblesDependent { collectible_type, task } if collectible_type == ty => { task });
let task_ids: SmallVec<[_; 4]> = task
.iter_collectibles_dependents()
.filter_map(|(collectible_type, task)| {
(collectible_type == ty).then_some(task)
})
.collect();
queue.push(
AggregationUpdateJob::InvalidateDueToCollectiblesChange {
task_ids,
Expand All @@ -162,21 +163,17 @@ impl Operation for CleanupOldEdgesOperation {
) => {
{
let mut task = ctx.task(cell_task_id, TaskDataCategory::Data);
task.remove(&CachedDataItemKey::CellDependent {
cell,
key,
task: task_id,
});
task.remove_cell_dependents(&(cell, key, task_id));
}
{
let mut task = ctx.task(task_id, TaskDataCategory::Data);
task.remove(&CachedDataItemKey::CellDependency {
target: CellRef {
task.remove_cell_dependencies(&(
CellRef {
task: cell_task_id,
cell,
},
key,
});
));
}
}
OutdatedEdge::OutputDependency(output_task_id) => {
Expand All @@ -189,15 +186,11 @@ impl Operation for CleanupOldEdgesOperation {
.entered();
{
let mut task = ctx.task(output_task_id, TaskDataCategory::Data);
task.remove(&CachedDataItemKey::OutputDependent {
task: task_id,
});
task.remove_output_dependent(&task_id);
}
{
let mut task = ctx.task(task_id, TaskDataCategory::Data);
task.remove(&CachedDataItemKey::OutputDependency {
target: output_task_id,
});
task.remove_output_dependencies(&output_task_id);
}
}
OutdatedEdge::CollectiblesDependency(CollectiblesRef {
Expand All @@ -207,18 +200,16 @@ impl Operation for CleanupOldEdgesOperation {
{
let mut task =
ctx.task(dependent_task_id, TaskDataCategory::Data);
task.remove(&CachedDataItemKey::CollectiblesDependent {
task.remove_collectibles_dependents(&(
collectible_type,
task: task_id,
});
task_id,
));
}
{
let mut task = ctx.task(task_id, TaskDataCategory::Data);
task.remove(&CachedDataItemKey::CollectiblesDependency {
target: CollectiblesRef {
collectible_type,
task: dependent_task_id,
},
task.remove_collectibles_dependencies(&CollectiblesRef {
collectible_type,
task: dependent_task_id,
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use turbo_tasks::{TaskExecutionReason, TaskId};

use crate::{
backend::{
TaskDataCategory, get_mut,
TaskDataCategory,
operation::{
ExecuteContext, Operation,
ExecuteContext, Operation, TaskGuard,
aggregation_update::{AggregationUpdateJob, AggregationUpdateQueue},
},
storage_schema::TaskStorageAccessors,
},
data::{CachedDataItem, CachedDataItemKey, InProgressState, InProgressStateInner},
data::{InProgressState, InProgressStateInner},
};

#[derive(Encode, Decode, Clone, Default)]
Expand All @@ -33,7 +33,7 @@ impl ConnectChildOperation {
let mut parent_task = ctx.task(parent_task_id, TaskDataCategory::All);
let Some(InProgressState::InProgress(box InProgressStateInner {
new_children, ..
})) = get_mut!(parent_task, InProgress)
})) = parent_task.get_in_progress_mut()
else {
panic!("Task is not in progress while calling another task: {parent_task:?}");
};
Expand All @@ -43,9 +43,7 @@ impl ConnectChildOperation {
return;
}

if parent_task.has_key(&CachedDataItemKey::Child {
task: child_task_id,
}) {
if parent_task.children_contains(&child_task_id) {
// It is already connected, we can skip the rest
return;
}
Expand All @@ -69,11 +67,10 @@ impl ConnectChildOperation {
} else {
let mut child_task = ctx.task(child_task_id, TaskDataCategory::All);

if !child_task.has_key(&CachedDataItemKey::Output {})
&& child_task.add(CachedDataItem::new_scheduled(
TaskExecutionReason::Connect,
|| ctx.get_task_desc_fn(child_task_id),
))
if !child_task.has_output()
&& child_task.add_scheduled(TaskExecutionReason::Connect, || {
ctx.get_task_desc_fn(child_task_id)
})
{
ctx.schedule_task(child_task, ctx.get_current_task_priority());
}
Expand All @@ -87,7 +84,7 @@ impl ConnectChildOperation {
}

impl Operation for ConnectChildOperation {
fn execute(mut self, ctx: &mut impl ExecuteContext) {
fn execute(mut self, ctx: &mut impl ExecuteContext<'_>) {
loop {
ctx.operation_suspend_point(&self);
match self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@ use turbo_tasks::{
util::{good_chunk_size, into_chunks},
};

use crate::{
backend::operation::{
AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext, ExecuteContext,
Operation, TaskGuard, aggregation_update::InnerOfUppersHasNewFollowersJob,
get_aggregation_number, get_uppers, is_aggregating_node,
},
data::{CachedDataItem, CachedDataItemType},
use crate::backend::operation::{
AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext, ExecuteContext, Operation,
TaskGuard, aggregation_update::InnerOfUppersHasNewFollowersJob, get_aggregation_number,
get_uppers, is_aggregating_node,
};

pub fn connect_children(
Expand All @@ -27,12 +24,13 @@ pub fn connect_children(

let parent_aggregation = get_aggregation_number(&parent_task);

parent_task.extend_new(
CachedDataItemType::Child,
new_children.iter().map(|&new_child| CachedDataItem::Child {
task: new_child,
value: (),
}),
let old_children = parent_task.children_len();
parent_task.extend_children(new_children.iter().copied());
debug_assert!(
old_children + new_children.len() == parent_task.children_len(),
"Attempted to connect {len} new children, but some of them were already present in \
{parent_task_id}",
len = new_children.len()
);

let new_follower_ids: SmallVec<_> = new_children.into_iter().collect();
Expand Down
Loading
Loading