diff --git a/container/src/lib.rs b/container/src/lib.rs index 839f80167..fa97aff69 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -4,46 +4,32 @@ use std::collections::VecDeque; -/// A container transferring data through dataflow edges +/// A type representing progress, with an update count. /// -/// A container stores a number of elements and thus is able to describe it length (`len()`) and -/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`). +/// It describes its update count (`count()`). /// -/// A container must implement default. The default implementation is not required to allocate -/// memory for variable-length components. -/// -/// We require the container to be cloneable to enable efficient copies when providing references -/// of containers to operators. Care must be taken that the type's `clone_from` implementation -/// is efficient (which is not necessarily the case when deriving `Clone`.) +/// We require [`Default`] for convenience purposes. pub trait Container: Default { - /// The type of elements when reading non-destructively from the container. - type ItemRef<'a> where Self: 'a; - - /// The type of elements when draining the container. - type Item<'a> where Self: 'a; - - /// Push `item` into self - #[inline] - fn push(&mut self, item: T) where Self: PushInto { - self.push_into(item) - } - /// The number of elements in this container /// /// This number is used in progress tracking to confirm the receipt of some number /// of outstanding records, and it is highly load bearing. The main restriction is - /// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors + /// imposed on the [`LengthPreservingContainerBuilder`] trait, whose implementors /// must preserve the number of items. - fn len(&self) -> usize; - - /// Determine if the container contains any elements, corresponding to `len() == 0`. - fn is_empty(&self) -> bool { - self.len() == 0 - } + fn count(&self) -> usize; /// Remove all contents from `self` while retaining allocated memory. /// After calling `clear`, `is_empty` must return `true` and `len` 0. fn clear(&mut self); +} + +/// A container that can reveal its contents through iterating by reference and draining. +pub trait IterContainer: Container { + /// The type of elements when reading non-destructively from the container. + type ItemRef<'a> where Self: 'a; + + /// The type of elements when draining the container. + type Item<'a> where Self: 'a; /// Iterator type when reading from the container. type Iter<'a>: Iterator> where Self: 'a; @@ -116,8 +102,9 @@ pub trait ContainerBuilder: Default + 'static { /// Partitions `container` among `builders`, using the function `index` to direct items. fn partition(container: &mut Self::Container, builders: &mut [Self], mut index: I) where - Self: for<'a> PushInto<::Item<'a>>, - I: for<'a> FnMut(&::Item<'a>) -> usize, + Self: for<'a> PushInto<::Item<'a>>, + I: for<'a> FnMut(&::Item<'a>) -> usize, + Self::Container: IterContainer, { for datum in container.drain() { let index = index(&datum); @@ -142,6 +129,33 @@ pub trait ContainerBuilder: Default + 'static { /// If you have any questions about this trait you are best off not implementing it. pub trait LengthPreservingContainerBuilder : ContainerBuilder { } +/// A container builder that never produces any outputs, and can be used to pass through data in +/// operators. +#[derive(Debug, Clone)] +pub struct PassthroughContainerBuilder(std::marker::PhantomData); + +impl Default for PassthroughContainerBuilder { + #[inline(always)] + fn default() -> Self { + PassthroughContainerBuilder(std::marker::PhantomData) + } +} + +impl ContainerBuilder for PassthroughContainerBuilder +{ + type Container = C; + + #[inline(always)] + fn extract(&mut self) -> Option<&mut Self::Container> { + None + } + + #[inline(always)] + fn finish(&mut self) -> Option<&mut Self::Container> { + None + } +} + /// A default container builder that uses length and preferred capacity to chunk data. /// /// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not @@ -165,7 +179,7 @@ impl> PushInto for CapacityContainerBuil self.current.ensure_capacity(&mut self.empty); // Push item - self.current.push(item); + self.current.push_into(item); // Maybe flush if self.current.at_capacity() { @@ -189,7 +203,7 @@ impl ContainerBuilder for CapacityContainerBuild #[inline] fn finish(&mut self) -> Option<&mut C> { - if !self.current.is_empty() { + if self.current.count() > 0 { self.pending.push_back(std::mem::take(&mut self.current)); } self.empty = self.pending.pop_front(); @@ -197,22 +211,16 @@ impl ContainerBuilder for CapacityContainerBuild } } -impl LengthPreservingContainerBuilder for CapacityContainerBuilder { } +impl LengthPreservingContainerBuilder for CapacityContainerBuilder { } impl Container for Vec { + #[inline(always)] fn count(&self) -> usize { Vec::len(self) } + #[inline(always)] fn clear(&mut self) { Vec::clear(self) } +} + +impl IterContainer for Vec { type ItemRef<'a> = &'a T where T: 'a; type Item<'a> = T where T: 'a; - - fn len(&self) -> usize { - Vec::len(self) - } - - fn is_empty(&self) -> bool { - Vec::is_empty(self) - } - - fn clear(&mut self) { Vec::clear(self) } - type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a; fn iter(&self) -> Self::Iter<'_> { @@ -268,21 +276,11 @@ mod rc { use std::ops::Deref; use std::rc::Rc; - use crate::Container; + use crate::{Container, IterContainer}; impl Container for Rc { - type ItemRef<'a> = T::ItemRef<'a> where Self: 'a; - type Item<'a> = T::ItemRef<'a> where Self: 'a; - - fn len(&self) -> usize { - std::ops::Deref::deref(self).len() - } - - fn is_empty(&self) -> bool { - std::ops::Deref::deref(self).is_empty() - } - - fn clear(&mut self) { + #[inline(always)] fn count(&self) -> usize { self.as_ref().count() } + #[inline(always)] fn clear(&mut self) { // Try to reuse the allocation if possible if let Some(inner) = Rc::get_mut(self) { inner.clear(); @@ -290,7 +288,12 @@ mod rc { *self = Self::default(); } } + } + + impl IterContainer for Rc { + type ItemRef<'a> = T::ItemRef<'a> where Self: 'a; + type Item<'a> = T::ItemRef<'a> where Self: 'a; type Iter<'a> = T::Iter<'a> where Self: 'a; fn iter(&self) -> Self::Iter<'_> { @@ -309,21 +312,11 @@ mod arc { use std::ops::Deref; use std::sync::Arc; - use crate::Container; - - impl Container for Arc { - type ItemRef<'a> = T::ItemRef<'a> where Self: 'a; - type Item<'a> = T::ItemRef<'a> where Self: 'a; - - fn len(&self) -> usize { - std::ops::Deref::deref(self).len() - } - - fn is_empty(&self) -> bool { - std::ops::Deref::deref(self).is_empty() - } + use crate::{Container, IterContainer}; - fn clear(&mut self) { + impl Container for std::sync::Arc { + #[inline(always)] fn count(&self) -> usize { self.as_ref().count() } + #[inline(always)] fn clear(&mut self) { // Try to reuse the allocation if possible if let Some(inner) = Arc::get_mut(self) { inner.clear(); @@ -331,7 +324,11 @@ mod arc { *self = Self::default(); } } + } + impl IterContainer for Arc { + type ItemRef<'a> = T::ItemRef<'a> where Self: 'a; + type Item<'a> = T::ItemRef<'a> where Self: 'a; type Iter<'a> = T::Iter<'a> where Self: 'a; fn iter(&self) -> Self::Iter<'_> { diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index d1869ee0a..5026fd9ca 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -2,7 +2,7 @@ use { std::collections::HashMap, - timely::{Container, container::CapacityContainerBuilder}, + timely::container::{CapacityContainerBuilder, IterContainer}, timely::dataflow::channels::pact::{ExchangeCore, Pipeline}, timely::dataflow::InputHandleCore, timely::dataflow::operators::{Inspect, Operator, Probe}, @@ -165,9 +165,10 @@ mod container { } } - impl timely::Container for Column { - fn len(&self) -> usize { self.borrow().len() } + impl timely::container::Container for Column { + #[inline(always)] fn count(&self) -> usize { self.borrow().len() } // This sets `self` to be an empty `Typed` variant, appropriate for pushing into. + #[inline(always)] fn clear(&mut self) { match self { Column::Typed(t) => t.clear(), @@ -175,7 +176,8 @@ mod container { Column::Align(_) => *self = Column::Typed(Default::default()), } } - + } + impl timely::container::IterContainer for Column { type ItemRef<'a> = C::Ref<'a>; type Iter<'a> = IterOwn>; fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() } diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 55404015c..e7f761048 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -10,7 +10,7 @@ use std::{fmt::{self, Debug}, marker::PhantomData}; use std::rc::Rc; -use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto}}; +use crate::container::{ContainerBuilder, LengthPreservingContainerBuilder, IterContainer, Container, SizableContainer, CapacityContainerBuilder, PushInto}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::communication::{Push, Pull}; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; @@ -52,8 +52,8 @@ pub type Exchange = ExchangeCore>, F>; impl ExchangeCore where - CB: LengthPreservingContainerBuilder, - for<'a> F: FnMut(&::Item<'a>)->u64 + CB: LengthPreservingContainerBuilder, + for<'a> F: FnMut(&::Item<'a>)->u64 { /// Allocates a new `Exchange` pact from a distribution function. pub fn new_core(func: F) -> ExchangeCore { @@ -66,7 +66,7 @@ where impl ExchangeCore, F> where - C: SizableContainer, + C: SizableContainer + IterContainer, for<'a> F: FnMut(&C::Item<'a>)->u64 { /// Allocates a new `Exchange` pact from a distribution function. @@ -81,10 +81,10 @@ where // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. impl ParallelizationContract for ExchangeCore where - CB: ContainerBuilder, - CB: for<'a> PushInto<::Item<'a>>, + CB: ContainerBuilder, + CB: for<'a> PushInto<::Item<'a>>, CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes, - for<'a> H: FnMut(&::Item<'a>) -> u64 + for<'a> H: FnMut(&::Item<'a>) -> u64 { type Pusher = ExchangePusher>>>, H>; type Puller = LogPuller>>>; @@ -147,7 +147,7 @@ impl>> Push> for LogPusher< source: self.source, target: self.target, seq_no: self.counter - 1, - length: bundle.data.len(), + length: bundle.data.count(), }) } } @@ -194,7 +194,7 @@ impl>> Pull> for LogPuller< source: bundle.from, target, seq_no: bundle.seq, - length: bundle.data.len(), + length: bundle.data.count(), }); } } diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 6f71848a4..4693dfef1 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -49,7 +49,7 @@ impl>> Counter let guard = ConsumedGuard { consumed: Rc::clone(&self.consumed), time: Some(message.time.clone()), - len: message.data.len(), + len: message.data.count(), }; Some((guard, message)) } diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index be579936c..fc6be3055 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -109,7 +109,7 @@ impl>> Buffer 0 { self.flush(); let time = self.time.as_ref().unwrap().clone(); Message::push_at(container, time, &mut self.pusher); diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index a6a16ce5d..a1f39fbba 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -21,7 +21,7 @@ impl Push> for Counter whe #[inline] fn push(&mut self, message: &mut Option>) { if let Some(message) = message { - self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64); + self.produced.borrow_mut().update(message.time.clone(), message.data.count() as i64); } // only propagate `None` if dirty (indicates flush) diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index fec26cae7..a495f9476 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -1,17 +1,17 @@ //! The exchange pattern distributes pushed data between many target pushees. use crate::communication::Push; -use crate::container::{ContainerBuilder, PushInto}; +use crate::container::{ContainerBuilder, IterContainer, PushInto}; use crate::dataflow::channels::Message; -use crate::{Container, Data}; +use crate::Data; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. pub struct Exchange where - CB: ContainerBuilder, + CB: ContainerBuilder, P: Push>, - for<'a> H: FnMut(&::Item<'a>) -> u64 + for<'a> H: FnMut(&::Item<'a>) -> u64 { pushers: Vec

, builders: Vec, @@ -21,9 +21,9 @@ where impl Exchange where - CB: ContainerBuilder, + CB: ContainerBuilder, P: Push>, - for<'a> H: FnMut(&::Item<'a>) -> u64 + for<'a> H: FnMut(&::Item<'a>) -> u64 { /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. pub fn new(pushers: Vec

, key: H) -> Exchange { @@ -50,10 +50,10 @@ where impl Push> for Exchange where - CB: ContainerBuilder, - CB: for<'a> PushInto<::Item<'a>>, + CB: ContainerBuilder, + CB: for<'a> PushInto<::Item<'a>>, P: Push>, - for<'a> H: FnMut(&::Item<'a>) -> u64 + for<'a> H: FnMut(&::Item<'a>) -> u64 { #[inline(never)] fn push(&mut self, message: &mut Option>) { diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index 924bd196a..3798adc83 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -4,6 +4,7 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::{Scope, Stream, StreamCore}; use crate::{Container, Data}; +use crate::container::PassthroughContainerBuilder; /// Extension trait for `Stream`. pub trait Branch { @@ -99,8 +100,8 @@ impl BranchWhen for StreamCore>(); + let (mut output2, stream2) = builder.new_output::>(); builder.build(move |_| { @@ -110,9 +111,9 @@ impl BranchWhen for StreamCore { @@ -48,7 +48,7 @@ pub trait Extract { fn extract(self) -> Vec<(T, C)>; } -impl Extract for ::std::sync::mpsc::Receiver> +impl Extract for ::std::sync::mpsc::Receiver> where for<'a> C: PushInto>, for<'a> C::Item<'a>: Ord, @@ -71,11 +71,9 @@ where to_sort.sort(); let mut sorted = C::default(); for datum in to_sort.into_iter() { - sorted.push(datum); - } - if !sorted.is_empty() { - result.push((time, sorted)); + sorted.push_into(datum); } + result.push((time, sorted)); } result } diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index 49878b887..0d6081bf4 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -38,6 +38,7 @@ //! allowing the replay to occur in a timely dataflow computation with more or fewer workers //! than that in which the stream was captured. +use crate::container::PassthroughContainerBuilder; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pushers::Counter as PushCounter; use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; @@ -76,7 +77,7 @@ where let (targets, stream) = builder.new_output(); - let mut output = PushBuffer::new(PushCounter::new(targets)); + let mut output = PushBuffer::<_,PassthroughContainerBuilder<_>,_>::new(PushCounter::new(targets)); let mut event_streams = self.into_iter().collect::>(); let mut started = false; let mut allocation: C = Default::default(); @@ -100,14 +101,14 @@ where progress.internals[0].extend(vec.into_iter()); }, Owned(Event::Messages(time, mut data)) => { - output.session(&time).give_container(&mut data); + output.session_with_builder(&time).give_container(&mut data); } Borrowed(Event::Progress(vec)) => { progress.internals[0].extend(vec.iter().cloned()); }, Borrowed(Event::Messages(time, data)) => { allocation.clone_from(data); - output.session(time).give_container(&mut allocation); + output.session_with_builder(time).give_container(&mut allocation); } } } diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index 24fcef532..36fc1c71c 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -2,6 +2,7 @@ use crate::{Container, Data}; +use crate::container::PassthroughContainerBuilder; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{StreamCore, Scope}; @@ -77,7 +78,7 @@ impl Concatenate for G { let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::>(); // create one output handle for the concatenated results. - let (mut output, result) = builder.new_output(); + let (mut output, result) = builder.new_output::>(); // build an operator that plays out all input data. builder.build(move |_capability| { @@ -86,7 +87,7 @@ impl Concatenate for G { let mut output = output.activate(); for handle in handles.iter_mut() { handle.for_each(|time, data| { - output.session(&time).give_container(data); + output.session_with_builder(&time).give_container(data); }) } } diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index bc221c4a6..eedca6963 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -218,7 +218,7 @@ where source: self.index, target: self.index, seq_no: self.counter, - length: bundle.data.len(), + length: bundle.data.count(), }; let recv_event = MessagesEvent { is_send: false, diff --git a/timely/src/dataflow/operators/core/exchange.rs b/timely/src/dataflow/operators/core/exchange.rs index bb021a5d5..bc04c6d88 100644 --- a/timely/src/dataflow/operators/core/exchange.rs +++ b/timely/src/dataflow/operators/core/exchange.rs @@ -1,13 +1,13 @@ //! Exchange records between workers. use crate::ExchangeData; -use crate::container::{Container, SizableContainer, PushInto}; +use crate::container::{IterContainer, SizableContainer, PushInto}; use crate::dataflow::channels::pact::ExchangeCore; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::{Scope, StreamCore}; /// Exchange records between workers. -pub trait Exchange { +pub trait Exchange { /// Exchange records between workers. /// /// The closure supplied should map a reference to a record to a `u64`, @@ -30,7 +30,7 @@ pub trait Exchange { impl Exchange for StreamCore where - C: SizableContainer + ExchangeData + crate::dataflow::channels::ContainerBytes, + C: SizableContainer + IterContainer + ExchangeData + crate::dataflow::channels::ContainerBytes, C: for<'a> PushInto>, { diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index 61bd5c196..229dc6939 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -1,7 +1,7 @@ //! Create cycles in a timely dataflow graph. use crate::{Container, Data}; -use crate::container::CapacityContainerBuilder; +use crate::container::PassthroughContainerBuilder; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pushers::Tee; use crate::dataflow::operators::generic::OutputWrapper; @@ -122,7 +122,7 @@ impl ConnectLoop for StreamCore { if let Some(new_time) = summary.results_in(cap.time()) { let new_cap = cap.delayed(&new_time); output - .session(&new_cap) + .session_with_builder(&new_cap) .give_container(data); } }); @@ -135,5 +135,5 @@ impl ConnectLoop for StreamCore { pub struct Handle { builder: OperatorBuilder, summary: ::Summary, - output: OutputWrapper, Tee>, + output: OutputWrapper, Tee>, } diff --git a/timely/src/dataflow/operators/core/filter.rs b/timely/src/dataflow/operators/core/filter.rs index 80ef564b0..3e1901d74 100644 --- a/timely/src/dataflow/operators/core/filter.rs +++ b/timely/src/dataflow/operators/core/filter.rs @@ -1,12 +1,12 @@ //! Filters a stream by a predicate. -use crate::container::{Container, SizableContainer, PushInto}; +use crate::container::{IterContainer, SizableContainer, PushInto}; use crate::Data; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for filtering. -pub trait Filter { +pub trait Filter { /// Returns a new instance of `self` containing only records satisfying `predicate`. /// /// # Examples @@ -23,16 +23,14 @@ pub trait Filter { fn filter)->bool+'static>(&self, predicate: P) -> Self; } -impl Filter for StreamCore +impl Filter for StreamCore where for<'a> C: PushInto> { fn filter)->bool+'static>(&self, mut predicate: P) -> StreamCore { self.unary(Pipeline, "Filter", move |_,_| move |input, output| { input.for_each(|time, data| { - if !data.is_empty() { - output.session(&time).give_iterator(data.drain().filter(&mut predicate)); - } + output.session(&time).give_iterator(data.drain().filter(&mut predicate)); }); }) } diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index c5f974432..533047b0f 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto}; +use crate::container::{CapacityContainerBuilder, ContainerBuilder, SizableContainer, PushInto}; use crate::scheduling::{Schedule, Activator}; @@ -59,7 +59,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore); + fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore); /// Create a new [StreamCore] and [Handle] through which to supply input. /// @@ -134,7 +134,7 @@ pub trait Input : Scope { use crate::order::TotalOrder; impl Input for G where ::Timestamp: TotalOrder { - fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore) { + fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore) { let mut handle = Handle::new(); let stream = self.input_from(&mut handle); (handle, stream) @@ -224,7 +224,7 @@ pub struct Handle { now_at: T, } -impl Handle> { +impl Handle> { /// Allocates a new input handle, from which one can create timely streams. /// /// # Examples @@ -440,7 +440,7 @@ impl Handle { /// }); /// ``` pub fn send_batch(&mut self, buffer: &mut CB::Container) { - if !buffer.is_empty() { + if !buffer.count() > 0 { // flush buffered elements to ensure local fifo. self.flush(); Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at); diff --git a/timely/src/dataflow/operators/core/inspect.rs b/timely/src/dataflow/operators/core/inspect.rs index b8c41f97b..305e700ff 100644 --- a/timely/src/dataflow/operators/core/inspect.rs +++ b/timely/src/dataflow/operators/core/inspect.rs @@ -1,12 +1,13 @@ //! Extension trait and implementation for observing and action on streamed data. use crate::{Container, Data}; +use crate::container::{IterContainer, PassthroughContainerBuilder}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::operators::generic::Operator; /// Methods to inspect records and batches of records on a stream. -pub trait Inspect: InspectCore + Sized { +pub trait Inspect: InspectCore + Sized { /// Runs a supplied closure on each observed data element. /// /// # Examples @@ -90,14 +91,14 @@ pub trait Inspect: InspectCore + Sized { fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } -impl Inspect for StreamCore { +impl Inspect for StreamCore { fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static { self.inspect_container(func) } } /// Inspect containers -pub trait InspectCore { +pub trait InspectCore { /// Runs a supplied closure on each observed container, and each frontier advancement. /// /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data, @@ -127,7 +128,7 @@ impl InspectCore for StreamCore { { use crate::progress::timestamp::Timestamp; let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum()); - self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| { + self.unary_frontier::,_,_,_>(Pipeline, "InspectBatch", move |_,_| move |input, output| { if input.frontier.frontier() != frontier.borrow() { frontier.clear(); frontier.extend(input.frontier.frontier().iter().cloned()); @@ -135,7 +136,7 @@ impl InspectCore for StreamCore { } input.for_each(|time, data| { func(Ok((&time, &*data))); - output.session(&time).give_container(data); + output.session_with_builder(&time).give_container(data); }); }) } diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 8af70e4a4..adfd1063c 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -1,13 +1,13 @@ //! Extension methods for `StreamCore` based on record-by-record transformation. -use crate::container::{Container, SizableContainer, PushInto}; +use crate::container::{IterContainer, SizableContainer, PushInto}; use crate::Data; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for `Stream`. -pub trait Map { +pub trait Map { /// Consumes each element of the stream and yields a new element. /// /// # Examples @@ -24,7 +24,7 @@ pub trait Map { /// ``` fn map(&self, mut logic: L) -> StreamCore where - C2: SizableContainer + PushInto + Data, + C2: SizableContainer + IterContainer + PushInto + Data, L: FnMut(C::Item<'_>)->D2 + 'static, { self.flat_map(move |x| std::iter::once(logic(x))) @@ -46,19 +46,19 @@ pub trait Map { fn flat_map(&self, logic: L) -> StreamCore where I: IntoIterator, - C2: SizableContainer + PushInto + Data, + C2: SizableContainer + IterContainer + PushInto + Data, L: FnMut(C::Item<'_>)->I + 'static, ; } -impl Map for StreamCore { +impl Map for StreamCore { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. fn flat_map(&self, mut logic: L) -> StreamCore where I: IntoIterator, - C2: SizableContainer + PushInto + Data, + C2: SizableContainer + IterContainer + PushInto + Data, L: FnMut(C::Item<'_>)->I + 'static, { self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| { diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index 77fd1f527..e8d1aa835 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -1,13 +1,13 @@ //! Operators that separate one stream into two streams based on some condition -use crate::container::{Container, SizableContainer, PushInto}; +use crate::container::{IterContainer, SizableContainer, PushInto}; use crate::Data; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::{Scope, StreamCore}; /// Extension trait for `Stream`. -pub trait OkErr { +pub trait OkErr { /// Takes one input stream and splits it into two output streams. /// For each record, the supplied closure is called with the data. /// If it returns `Ok(x)`, then `x` will be sent @@ -39,7 +39,7 @@ pub trait OkErr { ; } -impl OkErr for StreamCore { +impl OkErr for StreamCore { fn ok_err( &self, mut logic: L, diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index 754e9182c..6517ab94a 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -1,6 +1,6 @@ //! Partition a stream of records into multiple streams. -use timely_container::{Container, ContainerBuilder, PushInto}; +use timely_container::{IterContainer, ContainerBuilder, PushInto}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; @@ -8,7 +8,7 @@ use crate::dataflow::{Scope, StreamCore}; use crate::Data; /// Partition a stream of records into multiple streams. -pub trait Partition { +pub trait Partition { /// Produces `parts` output streams, containing records produced and assigned by `route`. /// /// # Examples @@ -34,7 +34,7 @@ pub trait Partition { F: FnMut(C::Item<'_>) -> (u64, D2) + 'static; } -impl Partition for StreamCore { +impl Partition for StreamCore { fn partition(&self, parts: u64, mut route: F) -> Vec> where CB: ContainerBuilder + PushInto, diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index cb3c96b34..997b3922e 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -14,6 +14,7 @@ use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::dataflow::{StreamCore, Scope}; use crate::{Container, Data}; +use crate::container::PassthroughContainerBuilder; /// Monitors progress at a `Stream`. pub trait Probe { @@ -92,7 +93,7 @@ impl Probe for StreamCore { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); let (tee, stream) = builder.new_output(); - let mut output = PushBuffer::new(PushCounter::new(tee)); + let mut output = PushBuffer::<_, PassthroughContainerBuilder<_>, _>::new(PushCounter::new(tee)); let shared_frontier = Rc::downgrade(&handle.frontier); let mut started = false; @@ -115,7 +116,7 @@ impl Probe for StreamCore { while let Some(message) = input.next() { let time = &message.time; let data = &mut message.data; - output.session(time).give_container(data); + output.session_with_builder(time).give_container(data); } output.cease(); diff --git a/timely/src/dataflow/operators/core/rc.rs b/timely/src/dataflow/operators/core/rc.rs index fdc68b9d4..392637d22 100644 --- a/timely/src/dataflow/operators/core/rc.rs +++ b/timely/src/dataflow/operators/core/rc.rs @@ -4,6 +4,7 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::Operator; use crate::dataflow::{Scope, StreamCore}; use crate::{Container, Data}; +use crate::container::PassthroughContainerBuilder; use std::rc::Rc; /// Convert a stream into a stream of shared containers @@ -26,11 +27,11 @@ pub trait SharedStream { impl SharedStream for StreamCore { fn shared(&self) -> StreamCore> { - self.unary(Pipeline, "Shared", move |_, _| { + self.unary::,_,_,_>(Pipeline, "Shared", move |_, _| { move |input, output| { input.for_each(|time, data| { output - .session(&time) + .session_with_builder(&time) .give_container(&mut Rc::new(std::mem::take(data))); }); } diff --git a/timely/src/dataflow/operators/core/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs index e74bedbc4..d592669f6 100644 --- a/timely/src/dataflow/operators/core/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -1,6 +1,7 @@ //! Extension methods for `Stream` based on record-by-record transformation. use crate::{Container, Data}; +use crate::container::PassthroughContainerBuilder; use crate::order::PartialOrder; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pact::Pipeline; @@ -53,7 +54,7 @@ impl Reclock for StreamCore { let mut stash = vec![]; - self.binary_notify(clock, Pipeline, Pipeline, "Reclock", vec![], move |input1, input2, output, notificator| { + self.binary_notify::<_,PassthroughContainerBuilder<_>,_,_,_>(clock, Pipeline, Pipeline, "Reclock", vec![], move |input1, input2, output, notificator| { // stash each data input with its timestamp. input1.for_each(|cap, data| { @@ -67,7 +68,7 @@ impl Reclock for StreamCore { // each time with complete stash can be flushed. notificator.for_each(|cap,_,_| { - let mut session = output.session(&cap); + let mut session = output.session_with_builder(&cap); for &mut (ref t, ref mut data) in &mut stash { if t.less_equal(cap.time()) { session.give_container(data); diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 77c6ba81b..72d9c6ad4 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -2,8 +2,8 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::{Container, Data}; -use crate::container::{ContainerBuilder, CapacityContainerBuilder}; +use crate::Data; +use crate::container::{ContainerBuilder, CapacityContainerBuilder, SizableContainer}; use crate::scheduling::{Schedule, ActivateOnDrop}; @@ -164,7 +164,7 @@ impl UnorderedHandle { } } -impl UnorderedHandle> { +impl UnorderedHandle> { /// Allocates a new automatically flushing session based on the supplied capability. #[inline] pub fn session(&mut self, cap: ActivateCapability) -> ActivateOnDrop, Counter>>> { diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 6a289e95e..09cd20733 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -16,7 +16,7 @@ use crate::dataflow::channels::pushers::buffer::{Buffer, Session}; use crate::dataflow::channels::Message; use crate::communication::{Push, Pull}; use crate::{Container, Data}; -use crate::container::{ContainerBuilder, CapacityContainerBuilder}; +use crate::container::{ContainerBuilder, CapacityContainerBuilder, SizableContainer}; use crate::logging::TimelyLogger as Logger; use crate::dataflow::operators::InputCapability; @@ -235,7 +235,7 @@ impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> } } -impl<'a, T: Timestamp, C: Container + Data, P: Push>> OutputHandleCore<'a, T, CapacityContainerBuilder, P> { +impl<'a, T: Timestamp, C: SizableContainer + Data, P: Push>> OutputHandleCore<'a, T, CapacityContainerBuilder, P> { /// Obtains a session that can send data at the timestamp associated with capability `cap`. /// /// In order to send data at a future timestamp, obtain a capability for the new timestamp diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 1c51f7ecd..4b17a891c 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -13,7 +13,7 @@ use super::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OperatorInfo; use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator}; use crate::{Container, Data}; -use crate::container::{ContainerBuilder, CapacityContainerBuilder}; +use crate::container::{ContainerBuilder, PassthroughContainerBuilder}; /// Methods to construct generic streaming and blocking operators. pub trait Operator { @@ -583,7 +583,13 @@ where /// }); /// ``` pub fn empty(scope: &G) -> StreamCore { - source::<_, CapacityContainerBuilder, _, _>(scope, "Empty", |_capability, _info| |_output| { + let mut builder = OperatorBuilder::new("Empty".to_owned(), scope.clone()); + + let (_output, stream) = builder.new_output::>(); + builder.set_notify(false); + builder.build(|_caps| |_frontier| { // drop capability, do nothing - }) + }); + + stream }