From 2e12b8227b5af2fac8f8f38fed5d327f9900dd8e Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 28 Aug 2025 14:31:32 -0400 Subject: [PATCH 1/3] Introduce builder for flatmap operators --- timely/src/dataflow/operators/core/map.rs | 96 +++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 140eeba62..12b18ddbc 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -49,6 +49,41 @@ pub trait Map { C2: Container + SizableContainer + PushInto, L: FnMut(C::Item<'_>)->I + 'static, ; + + /// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream. + /// + /// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the + /// overhead of multiple dataflow operators. The resulting single operator will internally use compiled + /// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records + /// of intermediate stages. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{Capture, ToStream, core::Map}; + /// use timely::dataflow::operators::capture::Extract; + /// + /// let data = timely::example(|scope| { + /// (0..10i32) + /// .to_stream(scope) + /// .flat_map_builder(|x| x + 1) + /// .map(|x| x + 1) + /// .map(|x| x + 1) + /// .map(|x| x + 1) + /// .map(Some) + /// .into_stream::>() + /// .capture() + /// }); + /// + /// assert_eq!((4..14).collect::>(), data.extract()[0].1); + /// ``` + fn flat_map_builder<'t, I, L>(&'t self, logic: L) -> FlatMapBuilder<'t, Self, S, C, L, I> + where + C: Clone + 'static, + L: for<'a> Fn(C::Item<'a>) -> I, + Self: Sized, + { + FlatMapBuilder::new(self, logic) + } } impl Map for StreamCore { @@ -68,3 +103,64 @@ impl Map for StreamCore { }) } } + + +/// A stream wrapper that allows the accumulation of flatmap logic. +pub struct FlatMapBuilder<'t, T: Map, S: Scope, C: Container, F: 'static, I> +where + for<'a> F: Fn(C::Item<'a>) -> I, +{ + stream: &'t T, + logic: F, + marker: std::marker::PhantomData<(S, C)>, +} + +impl<'t, T: Map, S: Scope, C: Container + Clone + 'static, F, I> FlatMapBuilder<'t, T, S, C, F, I> +where + for<'a> F: Fn(C::Item<'a>) -> I, +{ + /// Create a new wrapper with no action on the stream. + pub fn new(stream: &'t T, logic: F) -> Self { + FlatMapBuilder { stream, logic, marker: std::marker::PhantomData } + } + + /// Transform a flatmapped stream through addiitonal logic. + pub fn map I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<'t, T, S, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> { + let logic = self.logic; + FlatMapBuilder { + stream: self.stream, + logic: move |x| g(logic(x)), + marker: std::marker::PhantomData, + } + } + /// Convert the wrapper into a stream. + pub fn into_stream(self) -> StreamCore + where + I: IntoIterator, + C2: SizableContainer + PushInto + Data, + { + Map::flat_map(self.stream, self.logic) + } +} + +#[cfg(test)] +mod tests { + use crate::dataflow::operators::{Capture, ToStream, core::Map}; + use crate::dataflow::operators::capture::Extract; + + #[test] + fn test_builder() { + let data = crate::example(|scope| { + let stream = (0..10i32).to_stream(scope); + stream.flat_map_builder(|x| x + 1) + .map(|x| x + 1) + .map(|x| x + 1) + .map(|x| x + 1) + .map(Some) + .into_stream::>() + .capture() + }); + + assert_eq!((4..14).collect::>(), data.extract()[0].1); + } +} \ No newline at end of file From e59e87b3798a52226e88fa930afebefa19d5b0d1 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 28 Aug 2025 15:14:26 -0400 Subject: [PATCH 2/3] Move S generic to into_stream method --- timely/src/dataflow/operators/core/map.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 12b18ddbc..d9f1b42c8 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -70,13 +70,13 @@ pub trait Map { /// .map(|x| x + 1) /// .map(|x| x + 1) /// .map(Some) - /// .into_stream::>() + /// .into_stream::<_,Vec>() /// .capture() /// }); /// /// assert_eq!((4..14).collect::>(), data.extract()[0].1); /// ``` - fn flat_map_builder<'t, I, L>(&'t self, logic: L) -> FlatMapBuilder<'t, Self, S, C, L, I> + fn flat_map_builder<'t, I, L>(&'t self, logic: L) -> FlatMapBuilder<'t, Self, C, L, I> where C: Clone + 'static, L: for<'a> Fn(C::Item<'a>) -> I, @@ -106,16 +106,16 @@ impl Map for StreamCore { /// A stream wrapper that allows the accumulation of flatmap logic. -pub struct FlatMapBuilder<'t, T: Map, S: Scope, C: Container, F: 'static, I> +pub struct FlatMapBuilder<'t, T, C: Container, F: 'static, I> where for<'a> F: Fn(C::Item<'a>) -> I, { stream: &'t T, logic: F, - marker: std::marker::PhantomData<(S, C)>, + marker: std::marker::PhantomData, } -impl<'t, T: Map, S: Scope, C: Container + Clone + 'static, F, I> FlatMapBuilder<'t, T, S, C, F, I> +impl<'t, T, C: Container + Clone + 'static, F, I> FlatMapBuilder<'t, T, C, F, I> where for<'a> F: Fn(C::Item<'a>) -> I, { @@ -125,7 +125,7 @@ where } /// Transform a flatmapped stream through addiitonal logic. - pub fn map I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<'t, T, S, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> { + pub fn map I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<'t, T, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> { let logic = self.logic; FlatMapBuilder { stream: self.stream, @@ -134,9 +134,11 @@ where } } /// Convert the wrapper into a stream. - pub fn into_stream(self) -> StreamCore + pub fn into_stream(self) -> StreamCore where I: IntoIterator, + S: Scope, + T: Map, C2: SizableContainer + PushInto + Data, { Map::flat_map(self.stream, self.logic) @@ -157,7 +159,7 @@ mod tests { .map(|x| x + 1) .map(|x| x + 1) .map(Some) - .into_stream::>() + .into_stream::<_,Vec>() .capture() }); From f2ab328d7a864db527149549e75a57f0c9ec0ff3 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 28 Aug 2025 15:29:48 -0400 Subject: [PATCH 3/3] Update for rebase --- timely/src/dataflow/operators/core/map.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index d9f1b42c8..221b86201 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -106,7 +106,7 @@ impl Map for StreamCore { /// A stream wrapper that allows the accumulation of flatmap logic. -pub struct FlatMapBuilder<'t, T, C: Container, F: 'static, I> +pub struct FlatMapBuilder<'t, T, C: DrainContainer, F: 'static, I> where for<'a> F: Fn(C::Item<'a>) -> I, { @@ -115,7 +115,7 @@ where marker: std::marker::PhantomData, } -impl<'t, T, C: Container + Clone + 'static, F, I> FlatMapBuilder<'t, T, C, F, I> +impl<'t, T, C: DrainContainer + Clone + 'static, F, I> FlatMapBuilder<'t, T, C, F, I> where for<'a> F: Fn(C::Item<'a>) -> I, { @@ -139,7 +139,7 @@ where I: IntoIterator, S: Scope, T: Map, - C2: SizableContainer + PushInto + Data, + C2: Container + SizableContainer + PushInto, { Map::flat_map(self.stream, self.logic) }