From 722fe094c1e92e963d212d17012af699e2212401 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 12 Sep 2025 14:41:11 -0400 Subject: [PATCH 1/2] Remove Filter and Freeze wrappers --- differential-dataflow/examples/freeze.rs | 109 ------- .../src/operators/arrange/arrangement.rs | 39 --- .../src/trace/wrappers/filter.rs | 211 ------------- .../src/trace/wrappers/freeze.rs | 295 ------------------ .../src/trace/wrappers/mod.rs | 3 - 5 files changed, 657 deletions(-) delete mode 100644 differential-dataflow/examples/freeze.rs delete mode 100644 differential-dataflow/src/trace/wrappers/filter.rs delete mode 100644 differential-dataflow/src/trace/wrappers/freeze.rs diff --git a/differential-dataflow/examples/freeze.rs b/differential-dataflow/examples/freeze.rs deleted file mode 100644 index 0441f9ab55..0000000000 --- a/differential-dataflow/examples/freeze.rs +++ /dev/null @@ -1,109 +0,0 @@ -use timely::dataflow::operators::probe::Handle; -use timely::dataflow::operators::Map; - -use differential_dataflow::input::Input; -use differential_dataflow::AsCollection; -use differential_dataflow::operators::*; -use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf}; -use differential_dataflow::trace::wrappers::freeze::freeze; - -fn main() { - - // define a new computational scope, in which to run BFS - timely::execute_from_args(std::env::args(), move |worker| { - - // define BFS dataflow; return handles to roots and edges inputs - let mut probe = Handle::new(); - let (mut rules, mut graph) = worker.dataflow(|scope| { - - let (rule_input, rules) = scope.new_collection(); - let (edge_input, graph) = scope.new_collection(); - - let result = graph.iterate(|inner| { - - let rules = rules.enter(&inner.scope()); - let arranged = inner.arrange_by_key(); - - // rule 0: remove self-loops: - let freeze0 = freeze(&arranged, |t| { - if t.inner <= 0 { - let mut t = t.clone(); - t.inner = 0; - Some(t) - } - else { None } - }); - let rule0 = freeze0.as_collection(|&k,&v| (k,v)) - .filter(|x| x.0 == x.1) - .negate() - .inspect(|x| println!("rule0:\t{:?}", x)); - - // subtract self loops once, not each round. - let rule0 = &rule0.inner - .map_in_place(|dtr| { dtr.1.inner += 1; }) - .as_collection() - .negate() - .concat(&rule0); - - // rule 1: overwrite keys present in `rules` - let freeze1 = freeze(&arranged, |t| { - if t.inner <= 1 { - let mut t = t.clone(); - t.inner = 1; - Some(t) - } - else { None } - }); - let rule1 = freeze1.join_core(&rules.map(|(x,_y)| x).distinct().arrange_by_self(), |&k, &x, &()| Some((k,x))) - .negate() - .concat(&rules.inner.map_in_place(|dtr| dtr.1.inner = 1).as_collection()) - .inspect(|x| println!("rule1:\t{:?}", x)); - - let rule1 = &rule1.inner - .map_in_place(|dtr| { dtr.1.inner += 1; }) - .as_collection() - .negate() - .concat(&rule1); - - inner - .concat(&rule0) - .concat(&rule1) - .consolidate() - .inspect(|x| println!("inner:\t{:?}", x)) - }); - - result.consolidate() - .inspect(|x| println!("output\t{:?}", x)) - .probe_with(&mut probe); - - (rule_input, edge_input) - }); - - println!("starting up"); - - graph.insert((0, 1)); - graph.insert((1, 1)); - graph.insert((2, 1)); - graph.insert((2, 3)); - graph.advance_to(1); graph.flush(); - rules.advance_to(1); rules.flush(); - - while probe.less_than(graph.time()) { worker.step(); } - println!("round 0 complete"); - - graph.insert((3, 3)); - graph.advance_to(2); graph.flush(); - rules.advance_to(2); rules.flush(); - - while probe.less_than(graph.time()) { worker.step(); } - println!("round 1 complete"); - - rules.insert((2, 2)); - graph.advance_to(3); graph.flush(); - rules.advance_to(3); rules.flush(); - - while probe.less_than(graph.time()) { worker.step(); } - println!("round 2 complete"); - - }).unwrap(); -} \ No newline at end of file diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 05cab53289..66ce3471d0 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -35,7 +35,6 @@ use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; use trace::wrappers::enter_at::BatchEnter as BatchEnterAt; -use trace::wrappers::filter::{TraceFilter, BatchFilter}; use super::TraceAgent; @@ -131,44 +130,6 @@ where } } - /// Filters an arranged collection. - /// - /// This method produces a new arrangement backed by the same shared - /// arrangement as `self`, paired with user-specified logic that can - /// filter by key and value. The resulting collection is restricted - /// to the keys and values that return true under the user predicate. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::arrange::ArrangeByKey; - /// - /// ::timely::example(|scope| { - /// - /// let arranged = - /// scope.new_collection_from(0 .. 10).1 - /// .map(|x| (x, x+1)) - /// .arrange_by_key(); - /// - /// arranged - /// .filter(|k,v| k == v) - /// .as_collection(|k,v| (*k,*v)) - /// .assert_empty(); - /// }); - /// ``` - pub fn filter(&self, logic: F) - -> Arranged> - where - F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static, - { - let logic1 = logic.clone(); - let logic2 = logic.clone(); - Arranged { - trace: TraceFilter::make_from(self.trace.clone(), logic1), - stream: self.stream.map(move |bw| BatchFilter::make_from(bw, logic2.clone())), - } - } /// Flattens the stream into a `Collection`. /// /// The underlying `Stream>` is a much more efficient way to access the data, diff --git a/differential-dataflow/src/trace/wrappers/filter.rs b/differential-dataflow/src/trace/wrappers/filter.rs deleted file mode 100644 index 363ad04619..0000000000 --- a/differential-dataflow/src/trace/wrappers/filter.rs +++ /dev/null @@ -1,211 +0,0 @@ -//! Wrapper for filtered trace. - -use timely::progress::frontier::AntichainRef; - -use crate::trace::{TraceReader, BatchReader, Description}; -use crate::trace::cursor::Cursor; - -/// Wrapper to provide trace to nested scope. -pub struct TraceFilter { - trace: Tr, - logic: F, -} - -impl Clone for TraceFilter -where - Tr: TraceReader+Clone, - F: Clone, -{ - fn clone(&self) -> Self { - TraceFilter { - trace: self.trace.clone(), - logic: self.logic.clone(), - } - } -} - -impl WithLayout for TraceFilter { - type Layout = Tr::Layout; -} - -impl TraceReader for TraceFilter -where - Tr: TraceReader, - F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static, -{ - type Batch = BatchFilter; - type Storage = Tr::Storage; - type Cursor = CursorFilter; - - fn map_batches(&self, mut f: F2) { - let logic = self.logic.clone(); - self.trace - .map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), logic.clone()))) - } - - fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) } - fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() } - - fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) } - fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() } - - fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> { - self.trace.cursor_through(upper).map(|(x,y)| (CursorFilter::new(x, self.logic.clone()), y)) - } -} - -impl TraceFilter { - /// Makes a new trace wrapper - pub fn make_from(trace: Tr, logic: F) -> Self { - TraceFilter { - trace, - logic, - } - } -} - - -/// Wrapper to provide batch to nested scope. -#[derive(Clone)] -pub struct BatchFilter { - batch: B, - logic: F, -} - -impl WithLayout for BatchFilter { - type Layout = B::Layout; -} - -impl BatchReader for BatchFilter -where - B: BatchReader, - F: FnMut(B::Key<'_>, B::Val<'_>)->bool+Clone+'static -{ - type Cursor = BatchCursorFilter; - - fn cursor(&self) -> Self::Cursor { - BatchCursorFilter::new(self.batch.cursor(), self.logic.clone()) - } - fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { self.batch.description() } -} - -impl BatchFilter { - /// Makes a new batch wrapper - pub fn make_from(batch: B, logic: F) -> Self { - BatchFilter { - batch, - logic, - } - } -} - -/// Wrapper to provide cursor to nested scope. -pub struct CursorFilter { - cursor: C, - logic: F, -} - -use crate::trace::implementations::WithLayout; -impl WithLayout for CursorFilter { - type Layout = C::Layout; -} - -impl CursorFilter { - fn new(cursor: C, logic: F) -> Self { - CursorFilter { - cursor, - logic, - } - } -} - -impl Cursor for CursorFilter -where - C: Cursor, - F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static -{ - type Storage = C::Storage; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } - - #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(storage) } - #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(storage) } - - #[inline] - fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { - let key = self.key(storage); - let val = self.val(storage); - if (self.logic)(key, val) { - self.cursor.map_times(storage, logic) - } - } - - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) } - - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) } - - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } -} - - - -/// Wrapper to provide cursor to nested scope. -pub struct BatchCursorFilter { - cursor: C, - logic: F, -} - -impl WithLayout for BatchCursorFilter { - type Layout = C::Layout; -} - -impl BatchCursorFilter { - fn new(cursor: C, logic: F) -> Self { - BatchCursorFilter { - cursor, - logic, - } - } -} - -impl Cursor for BatchCursorFilter -where - F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static, -{ - type Storage = BatchFilter; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } - - #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(&storage.batch) } - #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(&storage.batch) } - - #[inline] - fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { - let key = self.key(storage); - let val = self.val(storage); - if (self.logic)(key, val) { - self.cursor.map_times(&storage.batch, logic) - } - } - - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) } - - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) } - - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } -} diff --git a/differential-dataflow/src/trace/wrappers/freeze.rs b/differential-dataflow/src/trace/wrappers/freeze.rs deleted file mode 100644 index d27d8ccc31..0000000000 --- a/differential-dataflow/src/trace/wrappers/freeze.rs +++ /dev/null @@ -1,295 +0,0 @@ -//! Wrappers to transform the timestamps of updates. -//! -//! These wrappers are primarily intended to support the re-use of a multi-version index -//! as if it were frozen at a particular (nested) timestamp. For example, if one wants to -//! re-use an index multiple times with minor edits, and only observe the edits at one -//! logical time (meaning: observing all edits less or equal to that time, advanced to that -//! time), this should allow that behavior. -//! -//! Informally, this wrapper is parameterized by a function `F: Fn(&T)->Option` which -//! provides the opportunity to alter the time at which an update happens and to suppress -//! that update, if appropriate. For example, the function -//! -//! ```ignore -//! |t| if t.inner <= 10 { let mut t = t.clone(); t.inner = 10; Some(t) } else { None } -//! ``` -//! -//! could be used to present all updates through inner iteration 10, but advanced to inner -//! iteration 10, as if they all occurred exactly at that moment. - -use std::rc::Rc; - -use timely::dataflow::Scope; -use timely::dataflow::operators::Map; -use timely::progress::frontier::AntichainRef; - -use crate::operators::arrange::Arranged; -use crate::trace::{TraceReader, BatchReader, Description}; -use crate::trace::cursor::Cursor; - -/// Freezes updates to an arrangement using a supplied function. -/// -/// This method is experimental, and should be used with care. The intent is that the function -/// `func` can be used to restrict and lock in updates at a particular time, as suggested in the -/// module-level documentation. -pub fn freeze(arranged: &Arranged, func: F) -> Arranged> -where - G: Scope, - T: TraceReader+Clone, - F: Fn(T::TimeGat<'_>)->Option+'static, -{ - let func1 = Rc::new(func); - let func2 = func1.clone(); - Arranged { - stream: arranged.stream.map(move |bw| BatchFreeze::make_from(bw, func1.clone())), - trace: TraceFreeze::make_from(arranged.trace.clone(), func2), - } -} - -/// Wrapper to provide trace to nested scope. -pub struct TraceFreeze -where - Tr: TraceReader, - F: Fn(Tr::TimeGat<'_>)->Option, -{ - trace: Tr, - func: Rc, -} - -impl Clone for TraceFreeze -where - Tr: TraceReader+Clone, - F: Fn(Tr::TimeGat<'_>)->Option, -{ - fn clone(&self) -> Self { - TraceFreeze { - trace: self.trace.clone(), - func: self.func.clone(), - } - } -} - -impl WithLayout for TraceFreeze -where - Tr: TraceReader, - F: Fn(Tr::TimeGat<'_>)->Option, -{ - type Layout = ( - ::KeyContainer, - ::ValContainer, - Vec, - ::DiffContainer, - ::OffsetContainer, - ); -} - -impl TraceReader for TraceFreeze -where - Tr: TraceReader, - F: Fn(Tr::TimeGat<'_>)->Option+'static, -{ - type Batch = BatchFreeze; - type Storage = Tr::Storage; - type Cursor = CursorFreeze; - - fn map_batches(&self, mut f: F2) { - let func = &self.func; - self.trace.map_batches(|batch| { - f(&Self::Batch::make_from(batch.clone(), func.clone())); - }) - } - - fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) } - fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() } - - fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) } - fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() } - - fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> { - let func = &self.func; - self.trace.cursor_through(upper) - .map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage)) - } -} - -impl TraceFreeze -where - Tr: TraceReader, - F: Fn(Tr::TimeGat<'_>)->Option, -{ - /// Makes a new trace wrapper - pub fn make_from(trace: Tr, func: Rc) -> Self { - Self { trace, func } - } -} - - -/// Wrapper to provide batch to nested scope. -pub struct BatchFreeze { - batch: B, - func: Rc, -} - -impl Clone for BatchFreeze { - fn clone(&self) -> Self { - BatchFreeze { - batch: self.batch.clone(), - func: self.func.clone(), - } - } -} - -impl WithLayout for BatchFreeze -where - B: BatchReader, - F: Fn(B::TimeGat<'_>)->Option, -{ - type Layout = ( - ::KeyContainer, - ::ValContainer, - Vec, - ::DiffContainer, - ::OffsetContainer, - ); -} - -impl BatchReader for BatchFreeze -where - B: BatchReader, - F: Fn(B::TimeGat<'_>)->Option, -{ - type Cursor = BatchCursorFreeze; - - fn cursor(&self) -> Self::Cursor { - BatchCursorFreeze::new(self.batch.cursor(), self.func.clone()) - } - fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { self.batch.description() } -} - -impl BatchFreeze -where - B: BatchReader, - F: Fn(B::TimeGat<'_>)->Option -{ - /// Makes a new batch wrapper - pub fn make_from(batch: B, func: Rc) -> Self { - Self { batch, func } - } -} - -/// Wrapper to provide cursor to nested scope. -pub struct CursorFreeze { - cursor: C, - func: Rc, -} - -use crate::trace::implementations::{Layout, WithLayout}; -impl WithLayout for CursorFreeze { - type Layout = ( - ::KeyContainer, - ::ValContainer, - Vec, - ::DiffContainer, - ::OffsetContainer, - ); -} - -impl CursorFreeze { - fn new(cursor: C, func: Rc) -> Self { - Self { cursor, func } - } -} - -impl Cursor for CursorFreeze -where - C: Cursor, - F: Fn(C::TimeGat<'_>)->Option, -{ - type Storage = C::Storage; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } - - #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(storage) } - #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(storage) } - - #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { - let func = &self.func; - self.cursor.map_times(storage, |time, diff| { - if let Some(time) = func(time) { - logic(&time, diff); - } - }) - } - - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) } - - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) } - - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } -} - - -/// Wrapper to provide cursor to nested scope. -pub struct BatchCursorFreeze { - cursor: C, - func: Rc, -} - -impl WithLayout for BatchCursorFreeze { - type Layout = ( - ::KeyContainer, - ::ValContainer, - Vec, - ::DiffContainer, - ::OffsetContainer, - ); -} - -impl BatchCursorFreeze { - fn new(cursor: C, func: Rc) -> Self { - Self { cursor, func } - } -} - -// impl, B: BatchReader, F> Cursor for BatchCursorFreeze -impl Cursor for BatchCursorFreeze -where - F: Fn(C::TimeGat<'_>)->Option, -{ - type Storage = BatchFreeze; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } - - #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(&storage.batch) } - #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(&storage.batch) } - - #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { - let func = &self.func; - self.cursor.map_times(&storage.batch, |time, diff| { - if let Some(time) = func(time) { - logic(&time, diff); - } - }) - } - - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) } - - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) } - - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) } -} diff --git a/differential-dataflow/src/trace/wrappers/mod.rs b/differential-dataflow/src/trace/wrappers/mod.rs index 82db1fe40e..aebaa72a6e 100644 --- a/differential-dataflow/src/trace/wrappers/mod.rs +++ b/differential-dataflow/src/trace/wrappers/mod.rs @@ -4,6 +4,3 @@ pub mod enter; pub mod enter_at; pub mod frontier; pub mod rc; - -pub mod filter; -pub mod freeze; From 5c2009816b74cd32fe0468f5ee4f21aaa46a107a Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 12 Sep 2025 16:53:56 -0400 Subject: [PATCH 2/2] Apologize in mdbook --- mdbook/src/chapter_5/chapter_5_4.md | 48 ++--------------------------- 1 file changed, 3 insertions(+), 45 deletions(-) diff --git a/mdbook/src/chapter_5/chapter_5_4.md b/mdbook/src/chapter_5/chapter_5_4.md index 4a28792ccc..2c3155a6ac 100644 --- a/mdbook/src/chapter_5/chapter_5_4.md +++ b/mdbook/src/chapter_5/chapter_5_4.md @@ -6,51 +6,9 @@ The set of trace wrappers grows as more idioms are discovered and implemented, b ## Filter -Like a collection, an arrangement supports the `filter(predicate)` operator that reduces the data down to those elements satisfying `predicate`. Unlike a collection, which produces a new collection when filtered, a filtered arrangement is just a wrapper around the existing arrangement. - -The following example uses two different collections in its two joins, but one is a filtered version of the other and can re-use the same arrangement. - -```rust -extern crate timely; -extern crate differential_dataflow; - -use differential_dataflow::operators::JoinCore; -use differential_dataflow::operators::arrange::ArrangeByKey; - -fn main() { - - // define a new timely dataflow computation. - timely::execute_from_args(::std::env::args(), move |worker| { - - let mut knows = differential_dataflow::input::InputSession::new(); - let mut query = differential_dataflow::input::InputSession::new(); - - worker.dataflow(|scope| { - - let knows = knows.to_collection(scope); - let query = query.to_collection(scope); - - // Arrange the data first! (by key). - let knows1 = knows.arrange_by_key(); - - // Filter to equal pairs (for some reason). - let knows2 = knows1.filter(|k,v| k == v); - - // Same logic as before, with a new method name. - query.join_core(&knows1, |x,q,y| Some((*y,(*x,*q)))) - .join_core(&knows2, |y,(x,q),z| Some((*q,(*x,*y,*z)))) - .inspect(|result| println!("result {:?}", result)); - - }); - -# // to help with type inference ... -# knows.update_at((0,0), 0usize, 1isize); -# query.update_at((0,0), 0usize, 1isize); - }); -} -``` - -Filtered arrangements are not always a win. If the input arrangement is large and the filtered arrangement is small, it may make more sense to build and maintain a second arrangement than to continually search through the large arrangement for records satisfying the predicate. If you would like to form a second arrangement, you can use `as_collection()` to return to a collection, filter the result, and then arrange it again. +The filter wrapper has been deprecated. +It's still a neat filter, but its existence was constraining the development velocity. +Reach out if you worry this was wrong, and we can discuss alternatives! ## Entering scopes