diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 4362a4113..e3e8af8a9 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -44,22 +44,14 @@ impl ParallelizationContract for Pip pub use exchange::{ExchangeCore, Exchange}; mod exchange { - use std::{fmt::{self, Debug}, marker::PhantomData}; - use std::rc::Rc; - - use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto}; - use crate::communication::{Push, Pull}; - use crate::dataflow::channels::pushers::Exchange as ExchangePusher; + use crate::Container; + use crate::container::{DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder}; use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor; - use crate::dataflow::channels::Message; - use crate::logging::TimelyLogger as Logger; - use crate::progress::Timestamp; - use crate::worker::AsWorker; - use super::{ParallelizationContract, LogPusher, LogPuller}; + use super::DistributorPact; /// An exchange between multiple observers by data - pub struct ExchangeCore { hash_func: F, phantom: PhantomData } + pub type ExchangeCore = DistributorPact DrainContainerDistributor>>; /// [ExchangeCore] specialized to vector-based containers. pub type Exchange = ExchangeCore>, F>; @@ -68,56 +60,22 @@ mod exchange { where CB: LengthPreservingContainerBuilder, CB::Container: DrainContainer, - for<'a> F: FnMut(&::Item<'a>)->u64 + for<'a> F: FnMut(&::Item<'a>)->u64 + 'static { /// Allocates a new `Exchange` pact from a distribution function. pub fn new_core(func: F) -> ExchangeCore { - ExchangeCore { - hash_func: func, - phantom: PhantomData, - } + DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers))) } } impl ExchangeCore, F> where - C: SizableContainer + DrainContainer, - for<'a> F: FnMut(&C::Item<'a>)->u64 + C: Container + SizableContainer + DrainContainer, + for<'a> F: FnMut(&C::Item<'a>)->u64 + 'static { /// Allocates a new `Exchange` pact from a distribution function. pub fn new(func: F) -> ExchangeCore, F> { - ExchangeCore { - hash_func: func, - phantom: PhantomData, - } - } - } - - // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. - impl ParallelizationContract for ExchangeCore - where - CB: ContainerBuilder + for<'a> PushInto<::Item<'a>>, - CB::Container: Send + crate::dataflow::channels::ContainerBytes, - for<'a> H: FnMut(&::Item<'a>) -> u64 + 'static, - { - type Pusher = ExchangePusher< - T, - LogPusher>>>, - DrainContainerDistributor - >; - type Puller = LogPuller>>>; - - fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>(identifier, address); - let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers()); - (ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) - } - } - - impl Debug for ExchangeCore { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Exchange").finish() + DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers))) } } } @@ -271,4 +229,4 @@ mod push_pull { result } } -} \ No newline at end of file +}