From 131578eab87320d7b14a01879b2c4377c0c4556d Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 29 Aug 2025 10:59:14 +0200 Subject: [PATCH] Remove IterContainer This change removes the IterContainer type and reworks other types and implementations to function independently. Notably, this is a breaking change for users of exotic containers: * The `Inspect` trait defers to `&C: IntoIterator` to reveal items in containers. Not all currently used containers provide this. Vec does, but Rc/Arc and Column don't and do not plan to in the future. Users should pivot to `inspect_container` instead and internalize the iteration in the closure. * The `DrainContainer` implementaiton for Rc/Arc depends on the wrapped type implementing `IntoIterator` for references. This limits where wrapped containers that do not provide this can be used. It's mostly limited to the core Timely layer that doesn't provide element-by-element functionality. I feel this change is net positive as it defers to the Rust API to iterate existing types, instead of us providing our own infrastructure. It comes at a cost of potentially breaking existing code, which seems acceptable. Signed-off-by: Moritz Hoffmann --- container/src/lib.rs | 79 ++++++------------- timely/examples/columnar.rs | 29 ++++--- timely/src/dataflow/operators/core/input.rs | 4 +- timely/src/dataflow/operators/core/inspect.rs | 19 +++-- timely/src/dataflow/operators/core/rc.rs | 7 +- 5 files changed, 58 insertions(+), 80 deletions(-) diff --git a/container/src/lib.rs b/container/src/lib.rs index 7f8447d9d..1b12c4882 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -4,7 +4,7 @@ use std::collections::VecDeque; -/// An type containing a number of records accounted for by progress tracking. +/// A type containing a number of records accounted for by progress tracking. /// /// The object stores a number of updates and thus is able to describe it count /// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the @@ -19,23 +19,10 @@ pub trait Accountable { fn record_count(&self) -> i64; /// Determine if this contains any updates, corresponding to `update_count() == 0`. - /// It is a correctness error for this to by anything other than `self.record_count() == 0`. + /// It is a correctness error for this to be anything other than `self.record_count() == 0`. #[inline] fn is_empty(&self) -> bool { self.record_count() == 0 } } -/// A container that allows iteration morally equivalent to [`IntoIterator`]. -/// -/// Iterating the container presents items in an implementation-specific order. -/// The container's contents are not changed. -pub trait IterContainer { - /// The type of elements when reading non-destructively from the container. - type ItemRef<'a> where Self: 'a; - /// Iterator type when reading from the container. - type Iter<'a>: Iterator> where Self: 'a; - /// Returns an iterator that reads the contents of this container. - fn iter(&self) -> Self::Iter<'_>; -} - /// A container that can drain itself. /// /// Draining the container presents items in an implementation-specific order. @@ -191,14 +178,6 @@ impl Accountable for Vec { #[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) } } -impl IterContainer for Vec { - type ItemRef<'a> = &'a T where T: 'a; - type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a; - #[inline] fn iter(&self) -> Self::Iter<'_> { - self.as_slice().iter() - } -} - impl DrainContainer for Vec { type Item<'a> = T where T: 'a; type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a; @@ -246,46 +225,32 @@ impl PushInto<&&T> for Vec { } mod rc { - use std::ops::Deref; - use std::rc::Rc; - - use crate::{IterContainer, DrainContainer}; - - impl crate::Accountable for Rc { - #[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() } - #[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() } - } - impl IterContainer for Rc { - type ItemRef<'a> = T::ItemRef<'a> where Self: 'a; - type Iter<'a> = T::Iter<'a> where Self: 'a; - #[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() } + impl crate::Accountable for std::rc::Rc { + #[inline] fn record_count(&self) -> i64 { self.as_ref().record_count() } + #[inline] fn is_empty(&self) -> bool { self.as_ref().is_empty() } } - impl DrainContainer for Rc { - type Item<'a> = T::ItemRef<'a> where Self: 'a; - type DrainIter<'a> = T::Iter<'a> where Self: 'a; - #[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() } + impl crate::DrainContainer for std::rc::Rc + where + for<'a> &'a T: IntoIterator + { + type Item<'a> = <&'a T as IntoIterator>::Item where Self: 'a; + type DrainIter<'a> = <&'a T as IntoIterator>::IntoIter where Self: 'a; + #[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.into_iter() } } } mod arc { - use std::ops::Deref; - use std::sync::Arc; - - use crate::{IterContainer, DrainContainer}; - - impl crate::Accountable for Arc { - #[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() } - #[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() } - } - impl IterContainer for Arc { - type ItemRef<'a> = T::ItemRef<'a> where Self: 'a; - type Iter<'a> = T::Iter<'a> where Self: 'a; - #[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() } + impl crate::Accountable for std::sync::Arc { + #[inline] fn record_count(&self) -> i64 { self.as_ref().record_count() } + #[inline] fn is_empty(&self) -> bool { self.as_ref().is_empty() } } - impl DrainContainer for Arc { - type Item<'a> = T::ItemRef<'a> where Self: 'a; - type DrainIter<'a> = T::Iter<'a> where Self: 'a; - #[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() } + impl crate::DrainContainer for std::sync::Arc + where + for<'a> &'a T: IntoIterator + { + type Item<'a> = <&'a T as IntoIterator>::Item where Self: 'a; + type DrainIter<'a> = <&'a T as IntoIterator>::IntoIter where Self: 'a; + #[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.into_iter() } } } diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index d858aac79..1348e4984 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -2,10 +2,12 @@ use std::collections::HashMap; -use timely::container::{IterContainer, CapacityContainerBuilder}; +use columnar::Index; +use timely::Accountable; +use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::InputHandleCore; -use timely::dataflow::operators::{Inspect, Operator, Probe}; +use timely::dataflow::operators::{InspectCore, Operator, Probe}; use timely::dataflow::ProbeHandle; // Creates `WordCountContainer` and `WordCountReference` structs, @@ -44,7 +46,7 @@ fn main() { move |input, output| { while let Some((time, data)) = input.next() { let mut session = output.session(&time); - for wordcount in data.iter().flat_map(|wordcount| { + for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| { wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff }) }) { session.give(wordcount); @@ -73,7 +75,7 @@ fn main() { if !input.frontier().less_equal(key.time()) { let mut session = output.session(key); for batch in val.drain(..) { - for wordcount in batch.iter() { + for wordcount in batch.borrow().into_index_iter() { let total = if let Some(count) = counts.get_mut(wordcount.text) { *count += wordcount.diff; @@ -94,7 +96,17 @@ fn main() { }, ) .container::() - .inspect(|x| println!("seen: {:?}", x)) + .inspect_container(|x| { + match x { + Ok((time, data)) => { + println!("seen at: {:?}\t{:?} records", time, data.record_count()); + for wc in data.borrow().into_index_iter() { + println!(" {}: {}", wc.text, wc.diff); + } + }, + Err(frontier) => println!("frontier advanced to {:?}", frontier), + } + }) .probe_with(&probe); }); @@ -167,7 +179,7 @@ mod container { impl Column { /// Borrows the contents no matter their representation. - #[inline(always)] fn borrow(&self) -> C::Borrowed<'_> { + #[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> { match self { Column::Typed(t) => t.borrow(), Column::Bytes(b) => as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))), @@ -180,11 +192,6 @@ mod container { #[inline] fn record_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() } #[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() } } - impl timely::container::IterContainer for Column { - type ItemRef<'a> = C::Ref<'a>; - type Iter<'a> = IterOwn>; - fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() } - } impl timely::container::DrainContainer for Column { type Item<'a> = C::Ref<'a>; type DrainIter<'a> = IterOwn>; diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 8db72755e..6a71f63b4 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -75,7 +75,7 @@ pub trait Input : Scope { /// ``` /// use std::rc::Rc; /// use timely::*; - /// use timely::dataflow::operators::core::{Input, Inspect}; + /// use timely::dataflow::operators::core::{Input, InspectCore}; /// use timely::container::CapacityContainerBuilder; /// /// // construct and execute a timely dataflow @@ -84,7 +84,7 @@ pub trait Input : Scope { /// // add an input and base computation off of it /// let mut input = worker.dataflow(|scope| { /// let (input, stream) = scope.new_input_with_builder::>>>(); - /// stream.inspect(|x| println!("hello {:?}", x)); + /// stream.inspect_container(|x| println!("hello {:?}", x)); /// input /// }); /// diff --git a/timely/src/dataflow/operators/core/inspect.rs b/timely/src/dataflow/operators/core/inspect.rs index 6491b9d54..0058d2fcf 100644 --- a/timely/src/dataflow/operators/core/inspect.rs +++ b/timely/src/dataflow/operators/core/inspect.rs @@ -1,13 +1,15 @@ //! Extension trait and implementation for observing and action on streamed data. use crate::Container; -use crate::container::IterContainer; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::operators::generic::Operator; /// Methods to inspect records and batches of records on a stream. -pub trait Inspect: InspectCore + Sized { +pub trait Inspect: InspectCore + Sized +where + for<'a> &'a C: IntoIterator, +{ /// Runs a supplied closure on each observed data element. /// /// # Examples @@ -21,10 +23,10 @@ pub trait Inspect: InspectCore + Sized { /// ``` fn inspect(&self, mut func: F) -> Self where - F: for<'a> FnMut(C::ItemRef<'a>) + 'static, + F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static, { self.inspect_batch(move |_, data| { - for datum in data.iter() { func(datum); } + for datum in data.into_iter() { func(datum); } }) } @@ -41,10 +43,10 @@ pub trait Inspect: InspectCore + Sized { /// ``` fn inspect_time(&self, mut func: F) -> Self where - F: for<'a> FnMut(&G::Timestamp, C::ItemRef<'a>) + 'static, + F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static, { self.inspect_batch(move |time, data| { - for datum in data.iter() { + for datum in data.into_iter() { func(time, datum); } }) @@ -91,7 +93,10 @@ pub trait Inspect: InspectCore + Sized { fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } -impl Inspect for StreamCore { +impl Inspect for StreamCore +where + for<'a> &'a C: IntoIterator, +{ fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static { self.inspect_container(func) } diff --git a/timely/src/dataflow/operators/core/rc.rs b/timely/src/dataflow/operators/core/rc.rs index e9c622bc7..4b996a3f9 100644 --- a/timely/src/dataflow/operators/core/rc.rs +++ b/timely/src/dataflow/operators/core/rc.rs @@ -12,13 +12,13 @@ pub trait SharedStream { /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::{ToStream, InspectCore}; /// use timely::dataflow::operators::rc::SharedStream; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .shared() - /// .inspect(|x| println!("seen: {:?}", x)); + /// .inspect_container(|x| println!("seen: {:?}", x)); /// }); /// ``` fn shared(&self) -> StreamCore>; @@ -43,12 +43,13 @@ mod test { use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::capture::Extract; use crate::dataflow::operators::rc::SharedStream; - use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream}; + use crate::dataflow::operators::{Capture, Concatenate, InspectCore, Operator, ToStream}; #[test] fn test_shared() { let output = crate::example(|scope| { let shared = vec![Ok(0), Err(())].to_stream(scope).container::>().shared(); + let shared = shared.inspect_container(|x| println!("seen: {x:?}")); scope .concatenate([ shared.unary(Pipeline, "read shared 1", |_, _| {