From 8c5027b0d6af6bba623ce437613fb671d87fd7f0 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 27 Aug 2025 20:24:47 +0200 Subject: [PATCH] Retire target and source changes in bulk If we have many updates per source or target, `propagate_all` will update an operator's pointstamps as many times as there are updates. Updating the mutable antichain behind pointstamps can be expensive. This change extracts the updates from the target and source changes, and applies them in bulk on the pointstamps in the hope of reducing the cost of maintaining the antichain. Signed-off-by: Moritz Hoffmann --- timely/src/progress/change_batch.rs | 18 ++++++++++++++++++ timely/src/progress/reachability.rs | 23 +++++++++++++++++++---- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/timely/src/progress/change_batch.rs b/timely/src/progress/change_batch.rs index 6a3c633b4..f8b766393 100644 --- a/timely/src/progress/change_batch.rs +++ b/timely/src/progress/change_batch.rs @@ -56,6 +56,24 @@ impl ChangeBatch { } } + /// Constructs a `ChangeBatch` from updates. + /// + /// # Examples + /// + ///``` + /// use timely::progress::ChangeBatch; + /// + /// let updates = [(5, 1), (5, -1)].into(); + /// let mut batch = ChangeBatch::from_updates(updates); + /// assert!(batch.is_empty()); + ///``` + pub fn from_updates(updates: SmallVec<[(T, i64); X]>) -> Self { + Self { + updates, + clean: 0, + } + } + /// Returns `true` if the change batch is not guaranteed compact. pub fn is_dirty(&self) -> bool { self.updates.len() > self.clean diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 921886c53..5c76ae0e2 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -591,10 +591,16 @@ impl Tracker { // By filtering the changes through `self.pointstamps` we react only to discrete // changes in the frontier, rather than changes in the pointstamp counts that // witness that frontier. - for ((target, time), diff) in self.target_changes.drain() { + + // For both target and source changes, we try to retire updates per target or source in + // bulk to avoid repeatedly updating the operator's mutable antichain. + let mut target_changes = std::mem::take(&mut self.target_changes).into_inner(); + while !target_changes.is_empty() { + let target = target_changes[0].0.0; + let update_count = target_changes.iter().position(|((t,_),_)| *t != target).unwrap_or(target_changes.len()); let operator = &mut self.per_operator[target.node].targets[target.port]; - let changes = operator.pointstamps.update_iter(Some((time, diff))); + let changes = operator.pointstamps.update_iter(target_changes.drain(..update_count).map(|((_,time),diff)| (time, diff))); for (time, diff) in changes { self.total_counts += diff; @@ -609,11 +615,17 @@ impl Tracker { self.worklist.push(Reverse((time, Location::from(target), diff))); } } + // Recycle `target_changes` allocation. + debug_assert!(target_changes.is_empty()); + self.target_changes = ChangeBatch::from_updates(target_changes); - for ((source, time), diff) in self.source_changes.drain() { + let mut source_changes = std::mem::take(&mut self.source_changes).into_inner(); + while !source_changes.is_empty() { + let source = source_changes[0].0.0; + let update_count = source_changes.iter().position(|((s,_),_)| *s != source).unwrap_or(source_changes.len()); let operator = &mut self.per_operator[source.node].sources[source.port]; - let changes = operator.pointstamps.update_iter(Some((time, diff))); + let changes = operator.pointstamps.update_iter(source_changes.drain(..update_count).map(|((_,time),diff)| (time, diff))); for (time, diff) in changes { self.total_counts += diff; @@ -628,6 +640,9 @@ impl Tracker { self.worklist.push(Reverse((time, Location::from(source), diff))); } } + // Recycle `source_changes` allocation. + debug_assert!(source_changes.is_empty()); + self.source_changes = ChangeBatch::from_updates(source_changes); // Step 2: Circulate implications of changes to `self.pointstamps`. //