Skip to content

Commit c45a709

Browse files
committed
VecCollection
1 parent 7252f06 commit c45a709

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+400
-400
lines changed

advent_of_code_2017/src/bin/day_07.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,7 +1074,7 @@ tvhftq (35)";
10741074
let index = worker.index();
10751075
let peers = worker.peers();
10761076

1077-
let worker_input =
1077+
let worker_input =
10781078
input
10791079
.split('\n')
10801080
.enumerate()
@@ -1102,7 +1102,7 @@ tvhftq (35)";
11021102
let weights = input.explode(|(name,weight,_)| Some((name, weight as isize)));
11031103
let parents = input.flat_map(|(name,_,links)| links.into_iter().map(move |link| (link,name.to_string())));
11041104

1105-
let total_weights: Collection<_,String> = weights
1105+
let total_weights: VecCollection<_,String> = weights
11061106
.iterate(|inner| {
11071107
parents.enter(&inner.scope())
11081108
.semijoin(&inner)

advent_of_code_2017/src/bin/day_09.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ fn main() {
2020
let index = worker.index();
2121
let peers = worker.peers();
2222

23-
let worker_input =
23+
let worker_input =
2424
input
2525
.chars()
2626
.enumerate()
@@ -37,7 +37,7 @@ fn main() {
3737
// { next_invalid, garbage }
3838
//
3939
// where the first bool indicates that the next character should be ignored,
40-
// and the second bool indicates that we are in a garbage scope. We will
40+
// and the second bool indicates that we are in a garbage scope. We will
4141
// encode this as the values 0 .. 4, where
4242
//
4343
// 0: valid, non-garbage
@@ -48,7 +48,7 @@ fn main() {
4848
// Each character initially describes a substring of length one, but we will
4949
// build up the state transition for larger substrings, iteratively.
5050

51-
let transitions =
51+
let transitions =
5252
input
5353
.map(|(pos, character)|
5454
(pos, match character {
@@ -95,7 +95,7 @@ fn main() {
9595
}
9696

9797
/// Accumulate data in `collection` into all powers-of-two intervals containing them.
98-
fn pp_aggregate<G, D, F>(collection: Collection<G, (usize, D)>, combine: F) -> Collection<G, ((usize, usize), D)>
98+
fn pp_aggregate<G, D, F>(collection: VecCollection<G, (usize, D)>, combine: F) -> VecCollection<G, ((usize, usize), D)>
9999
where
100100
G: Scope<Timestamp: Lattice>,
101101
D: Data,
@@ -105,7 +105,7 @@ where
105105
let unit_ranges = collection.map(|(index, data)| ((index, 0), data));
106106

107107
unit_ranges
108-
.iterate(|ranges|
108+
.iterate(|ranges|
109109

110110
// Each available range, of size less than usize::max_value(), advertises itself as the range
111111
// twice as large, aligned to integer multiples of its size. Each range, which may contain at
@@ -126,26 +126,26 @@ where
126126

127127
/// Produces the accumulated values at each of the `usize` locations in `aggregates` (and others).
128128
fn pp_broadcast<G, D, B, F>(
129-
ranges: Collection<G, ((usize, usize), D)>,
129+
ranges: VecCollection<G, ((usize, usize), D)>,
130130
seed: B,
131131
zero: D,
132-
combine: F) -> Collection<G, (usize, B)>
132+
combine: F) -> VecCollection<G, (usize, B)>
133133
where
134134
G: Scope<Timestamp: Lattice+Ord+::std::fmt::Debug>,
135135
D: Data,
136136
B: Data+::std::hash::Hash,
137137
F: Fn(&B, &D) -> B + 'static,
138138
{
139139
// Each range proposes an empty first child, to provide for its second child if it has no sibling.
140-
// This is important if we want to reconstruct
140+
// This is important if we want to reconstruct
141141
let zero_ranges =
142142
ranges
143143
.map(move |((pos, log),_)| ((pos, if log > 0 { log - 1 } else { 0 }), zero.clone()))
144144
.antijoin(&ranges.map(|((pos, log),_)| (pos, log)));
145145

146146
let aggregates = ranges.concat(&zero_ranges);
147147

148-
let init_state =
148+
let init_state =
149149
Some(((0, seed), Default::default(), 1))
150150
.to_stream(&mut aggregates.scope())
151151
.as_collection();

differential-dataflow/examples/bfs.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use timely::dataflow::*;
44
use timely::dataflow::operators::probe::Handle;
55

66
use differential_dataflow::input::Input;
7-
use differential_dataflow::Collection;
7+
use differential_dataflow::VecCollection;
88
use differential_dataflow::operators::*;
99
use differential_dataflow::lattice::Lattice;
1010

@@ -91,9 +91,9 @@ fn main() {
9191
}
9292

9393
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
94-
fn bfs<G>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
94+
fn bfs<G>(edges: &VecCollection<G, Edge>, roots: &VecCollection<G, Node>) -> VecCollection<G, (Node, u32)>
9595
where
96-
G: Scope<Timestamp: Lattice+Ord>,
96+
G: Scope<Timestamp: Lattice+Ord>,
9797
{
9898
// initialize roots as reaching themselves at distance 0
9999
let nodes = roots.map(|x| (x, 0));
@@ -108,4 +108,4 @@ where
108108
.concat(&nodes)
109109
.reduce(|_, s, t| t.push((*s[0].0, 1)))
110110
})
111-
}
111+
}

differential-dataflow/examples/dynamic.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use timely::dataflow::*;
44
use timely::dataflow::operators::probe::Handle;
55

66
use differential_dataflow::input::Input;
7-
use differential_dataflow::Collection;
7+
use differential_dataflow::VecCollection;
88
use differential_dataflow::operators::*;
99
use differential_dataflow::lattice::Lattice;
1010

@@ -91,9 +91,9 @@ fn main() {
9191
}
9292

9393
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
94-
fn bfs<G>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
94+
fn bfs<G>(edges: &VecCollection<G, Edge>, roots: &VecCollection<G, Node>) -> VecCollection<G, (Node, u32)>
9595
where
96-
G: Scope<Timestamp: Lattice+Ord>,
96+
G: Scope<Timestamp: Lattice+Ord>,
9797
{
9898
use timely::order::Product;
9999
use iterate::Variable;
@@ -115,7 +115,7 @@ where
115115
let inner = feedback_summary::<usize>(1, 1);
116116
let label = Variable::new_from(nodes.clone(), Product { outer: Default::default(), inner });
117117

118-
let next =
118+
let next =
119119
label
120120
.join_map(&edges, |_k,l,d| (*d, l+1))
121121
.concat(&nodes)
@@ -130,4 +130,4 @@ where
130130
.leave()
131131
})
132132

133-
}
133+
}

differential-dataflow/examples/graspan.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ use timely::order::Product;
88
use timely::dataflow::Scope;
99
use timely::dataflow::scopes::ScopeParent;
1010

11-
use differential_dataflow::Collection;
11+
use differential_dataflow::VecCollection;
1212
use differential_dataflow::lattice::Lattice;
1313
use differential_dataflow::input::{Input, InputSession};
1414
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
15-
use differential_dataflow::operators::iterate::VariableRow;
15+
use differential_dataflow::operators::iterate::VecVariable;
1616
use differential_dataflow::operators::Threshold;
1717

1818
type Node = usize;
@@ -83,16 +83,16 @@ type Arrange<G,K,V,R> = Arranged<G, TraceValHandle<K, V, <G as ScopeParent>::Tim
8383
/// An edge variable provides arranged representations of its contents, even before they are
8484
/// completely defined, in support of recursively defined productions.
8585
pub struct EdgeVariable<G: Scope<Timestamp: Lattice>> {
86-
variable: VariableRow<G, Edge, Diff>,
87-
current: Collection<G, Edge, Diff>,
86+
variable: VecVariable<G, Edge, Diff>,
87+
current: VecCollection<G, Edge, Diff>,
8888
forward: Option<Arrange<G, Node, Node, Diff>>,
8989
reverse: Option<Arrange<G, Node, Node, Diff>>,
9090
}
9191

9292
impl<G: Scope<Timestamp: Lattice>> EdgeVariable<G> {
9393
/// Creates a new variable initialized with `source`.
94-
pub fn from(source: &Collection<G, Edge>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
95-
let variable = VariableRow::new(&mut source.scope(), step);
94+
pub fn from(source: &VecCollection<G, Edge>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
95+
let variable = VecVariable::new(&mut source.scope(), step);
9696
EdgeVariable {
9797
variable: variable,
9898
current: source.clone(),
@@ -101,7 +101,7 @@ impl<G: Scope<Timestamp: Lattice>> EdgeVariable<G> {
101101
}
102102
}
103103
/// Concatenates `production` into the definition of the variable.
104-
pub fn add_production(&mut self, production: &Collection<G, Edge, Diff>) {
104+
pub fn add_production(&mut self, production: &VecCollection<G, Edge, Diff>) {
105105
self.current = self.current.concat(production);
106106
}
107107
/// Finalizes the variable, connecting its recursive definition.

differential-dataflow/examples/interpreted.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::hash::Hash;
22
use timely::dataflow::*;
33
use timely::dataflow::operators::*;
44

5-
use differential_dataflow::Collection;
5+
use differential_dataflow::VecCollection;
66
use differential_dataflow::lattice::Lattice;
77
use differential_dataflow::operators::*;
88

@@ -35,13 +35,13 @@ fn main() {
3535
println!("loaded {} nodes, {} edges", nodes, edges.len());
3636

3737
worker.dataflow::<(),_,_>(|scope| {
38-
interpret(&Collection::new(edges.to_stream(scope)), &[(0,2), (1,2)]);
38+
interpret(&VecCollection::new(edges.to_stream(scope)), &[(0,2), (1,2)]);
3939
});
4040

4141
}).unwrap();
4242
}
4343

44-
fn interpret<G>(edges: &Collection<G, Edge>, relations: &[(usize, usize)]) -> Collection<G, Vec<Node>>
44+
fn interpret<G>(edges: &VecCollection<G, Edge>, relations: &[(usize, usize)]) -> VecCollection<G, Vec<Node>>
4545
where
4646
G: Scope<Timestamp: Lattice+Hash+Ord>,
4747
{
@@ -103,4 +103,4 @@ where
103103
}
104104

105105
results
106-
}
106+
}

differential-dataflow/examples/iterate_container.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use timely::dataflow::operators::Operator;
55
use timely::order::Product;
66
use timely::dataflow::{Scope, StreamCore};
77
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
8-
use differential_dataflow::{AsCollection, collection::CollectionCore};
8+
use differential_dataflow::{AsCollection, Collection};
99
use differential_dataflow::input::Input;
1010
use differential_dataflow::operators::iterate::Variable;
1111
use differential_dataflow::collection::containers::{Enter, Leave, Negate, ResultsIn};
@@ -52,12 +52,12 @@ fn main() {
5252
timely::example(|scope| {
5353

5454
let numbers = scope.new_collection_from(1 .. 10u32).1;
55-
let numbers: CollectionCore<_, _> = wrap(&numbers.inner).as_collection();
55+
let numbers: Collection<_, _> = wrap(&numbers.inner).as_collection();
5656

5757
scope.iterative::<u64,_,_>(|nested| {
5858
let summary = Product::new(Default::default(), 1);
5959
let variable = Variable::new_from(numbers.enter(nested), summary);
60-
let mapped: CollectionCore<_, _> = variable.inner.unary(Pipeline, "Map", |_,_| {
60+
let mapped: Collection<_, _> = variable.inner.unary(Pipeline, "Map", |_,_| {
6161
|input, output| {
6262
input.for_each(|time, data| {
6363
let mut session = output.session(&time);

differential-dataflow/examples/monoid-bfs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use timely::dataflow::*;
55
use timely::dataflow::operators::probe::Handle;
66

77
use differential_dataflow::input::Input;
8-
use differential_dataflow::Collection;
8+
use differential_dataflow::VecCollection;
99
use differential_dataflow::operators::*;
1010
use differential_dataflow::lattice::Lattice;
1111

@@ -123,7 +123,7 @@ fn main() {
123123
}
124124

125125
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
126-
fn bfs<G>(edges: &Collection<G, Edge, MinSum>, roots: &Collection<G, Node, MinSum>) -> Collection<G, Node, MinSum>
126+
fn bfs<G>(edges: &VecCollection<G, Edge, MinSum>, roots: &VecCollection<G, Node, MinSum>) -> VecCollection<G, Node, MinSum>
127127
where
128128
G: Scope<Timestamp: Lattice+Ord>,
129129
{

differential-dataflow/examples/pagerank.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use timely::order::Product;
22
use timely::dataflow::{*, operators::Filter};
33

4-
use differential_dataflow::Collection;
4+
use differential_dataflow::VecCollection;
55
use differential_dataflow::lattice::Lattice;
66
use differential_dataflow::operators::{*, iterate::Variable};
77
use differential_dataflow::input::InputSession;
@@ -77,7 +77,7 @@ fn main() {
7777

7878
// Returns a weighted collection in which the weight of each node is proportional
7979
// to its PageRank in the input graph `edges`.
80-
fn pagerank<G>(iters: Iter, edges: &Collection<G, Edge, Diff>) -> Collection<G, Node, Diff>
80+
fn pagerank<G>(iters: Iter, edges: &VecCollection<G, Edge, Diff>) -> VecCollection<G, Node, Diff>
8181
where
8282
G: Scope<Timestamp: Lattice>,
8383
{

differential-dataflow/examples/progress.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use timely::dataflow::*;
55
use timely::dataflow::operators::probe::Handle;
66

77
use differential_dataflow::input::Input;
8-
use differential_dataflow::Collection;
8+
use differential_dataflow::VecCollection;
99
use differential_dataflow::operators::*;
1010

1111
use differential_dataflow::lattice::Lattice;
@@ -115,18 +115,18 @@ fn main() {
115115
/// The computation to determine this, and to maintain it as times change, is an iterative
116116
/// computation that propagates times and maintains the minimal elements at each location.
117117
fn frontier<G, T>(
118-
nodes: Collection<G, (Target, Source, T::Summary)>,
119-
edges: Collection<G, (Source, Target)>,
120-
times: Collection<G, (Location, T)>,
121-
) -> Collection<G, (Location, T)>
118+
nodes: VecCollection<G, (Target, Source, T::Summary)>,
119+
edges: VecCollection<G, (Source, Target)>,
120+
times: VecCollection<G, (Location, T)>,
121+
) -> VecCollection<G, (Location, T)>
122122
where
123123
G: Scope<Timestamp: Lattice+Ord>,
124124
T: Timestamp<Summary: differential_dataflow::ExchangeData>,
125125
{
126126
// Translate node and edge transitions into a common Location to Location edge with an associated Summary.
127127
let nodes = nodes.map(|(target, source, summary)| (Location::from(target), (Location::from(source), summary)));
128128
let edges = edges.map(|(source, target)| (Location::from(source), (Location::from(target), Default::default())));
129-
let transitions: Collection<G, (Location, (Location, T::Summary))> = nodes.concat(&edges);
129+
let transitions: VecCollection<G, (Location, (Location, T::Summary))> = nodes.concat(&edges);
130130

131131
times
132132
.iterate(|reach| {
@@ -149,9 +149,9 @@ where
149149

150150
/// Summary paths from locations to operator zero inputs.
151151
fn summarize<G, T>(
152-
nodes: Collection<G, (Target, Source, T::Summary)>,
153-
edges: Collection<G, (Source, Target)>,
154-
) -> Collection<G, (Location, (Location, T::Summary))>
152+
nodes: VecCollection<G, (Target, Source, T::Summary)>,
153+
edges: VecCollection<G, (Source, Target)>,
154+
) -> VecCollection<G, (Location, (Location, T::Summary))>
155155
where
156156
G: Scope<Timestamp: Lattice+Ord>,
157157
T: Timestamp<Summary: differential_dataflow::ExchangeData+std::hash::Hash>,
@@ -166,7 +166,7 @@ where
166166
// Retain node connections along "default" timestamp summaries.
167167
let nodes = nodes.map(|(target, source, summary)| (Location::from(source), (Location::from(target), summary)));
168168
let edges = edges.map(|(source, target)| (Location::from(target), (Location::from(source), Default::default())));
169-
let transitions: Collection<G, (Location, (Location, T::Summary))> = nodes.concat(&edges);
169+
let transitions: VecCollection<G, (Location, (Location, T::Summary))> = nodes.concat(&edges);
170170

171171
zero_inputs
172172
.iterate(|summaries| {
@@ -192,9 +192,9 @@ where
192192

193193
/// Identifies cycles along paths that do not increment timestamps.
194194
fn find_cycles<G: Scope, T: Timestamp>(
195-
nodes: Collection<G, (Target, Source, T::Summary)>,
196-
edges: Collection<G, (Source, Target)>,
197-
) -> Collection<G, (Location, Location)>
195+
nodes: VecCollection<G, (Target, Source, T::Summary)>,
196+
edges: VecCollection<G, (Source, Target)>,
197+
) -> VecCollection<G, (Location, Location)>
198198
where
199199
G: Scope<Timestamp: Lattice+Ord>,
200200
T: Timestamp<Summary: differential_dataflow::ExchangeData>,
@@ -209,7 +209,7 @@ where
209209
}
210210
});
211211
let edges = edges.map(|(source, target)| (Location::from(source), Location::from(target)));
212-
let transitions: Collection<G, (Location, Location)> = nodes.concat(&edges);
212+
let transitions: VecCollection<G, (Location, Location)> = nodes.concat(&edges);
213213

214214
// Repeatedly restrict to locations with an incoming path.
215215
transitions
@@ -223,4 +223,4 @@ where
223223
.semijoin(&active)
224224
})
225225
.consolidate()
226-
}
226+
}

0 commit comments

Comments
 (0)