diff --git a/timely/src/progress/change_batch.rs b/timely/src/progress/change_batch.rs index 6a3c633b4..f8b766393 100644 --- a/timely/src/progress/change_batch.rs +++ b/timely/src/progress/change_batch.rs @@ -56,6 +56,24 @@ impl ChangeBatch { } } + /// Constructs a `ChangeBatch` from updates. + /// + /// # Examples + /// + ///``` + /// use timely::progress::ChangeBatch; + /// + /// let updates = [(5, 1), (5, -1)].into(); + /// let mut batch = ChangeBatch::from_updates(updates); + /// assert!(batch.is_empty()); + ///``` + pub fn from_updates(updates: SmallVec<[(T, i64); X]>) -> Self { + Self { + updates, + clean: 0, + } + } + /// Returns `true` if the change batch is not guaranteed compact. pub fn is_dirty(&self) -> bool { self.updates.len() > self.clean diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 921886c53..5c76ae0e2 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -591,10 +591,16 @@ impl Tracker { // By filtering the changes through `self.pointstamps` we react only to discrete // changes in the frontier, rather than changes in the pointstamp counts that // witness that frontier. - for ((target, time), diff) in self.target_changes.drain() { + + // For both target and source changes, we try to retire updates per target or source in + // bulk to avoid repeatedly updating the operator's mutable antichain. + let mut target_changes = std::mem::take(&mut self.target_changes).into_inner(); + while !target_changes.is_empty() { + let target = target_changes[0].0.0; + let update_count = target_changes.iter().position(|((t,_),_)| *t != target).unwrap_or(target_changes.len()); let operator = &mut self.per_operator[target.node].targets[target.port]; - let changes = operator.pointstamps.update_iter(Some((time, diff))); + let changes = operator.pointstamps.update_iter(target_changes.drain(..update_count).map(|((_,time),diff)| (time, diff))); for (time, diff) in changes { self.total_counts += diff; @@ -609,11 +615,17 @@ impl Tracker { self.worklist.push(Reverse((time, Location::from(target), diff))); } } + // Recycle `target_changes` allocation. + debug_assert!(target_changes.is_empty()); + self.target_changes = ChangeBatch::from_updates(target_changes); - for ((source, time), diff) in self.source_changes.drain() { + let mut source_changes = std::mem::take(&mut self.source_changes).into_inner(); + while !source_changes.is_empty() { + let source = source_changes[0].0.0; + let update_count = source_changes.iter().position(|((s,_),_)| *s != source).unwrap_or(source_changes.len()); let operator = &mut self.per_operator[source.node].sources[source.port]; - let changes = operator.pointstamps.update_iter(Some((time, diff))); + let changes = operator.pointstamps.update_iter(source_changes.drain(..update_count).map(|((_,time),diff)| (time, diff))); for (time, diff) in changes { self.total_counts += diff; @@ -628,6 +640,9 @@ impl Tracker { self.worklist.push(Reverse((time, Location::from(source), diff))); } } + // Recycle `source_changes` allocation. + debug_assert!(source_changes.is_empty()); + self.source_changes = ChangeBatch::from_updates(source_changes); // Step 2: Circulate implications of changes to `self.pointstamps`. //