1111//!
1212//! Consult [TopKPlan] documentation for details.
1313
14+ use std:: collections:: HashMap ;
15+
1416use differential_dataflow:: hashable:: Hashable ;
1517use differential_dataflow:: lattice:: Lattice ;
1618use differential_dataflow:: operators:: arrange:: ArrangeBySelf ;
@@ -19,6 +21,8 @@ use differential_dataflow::operators::Consolidate;
1921use differential_dataflow:: trace:: implementations:: ord:: OrdValSpine ;
2022use differential_dataflow:: AsCollection ;
2123use differential_dataflow:: Collection ;
24+ use timely:: dataflow:: channels:: pact:: Pipeline ;
25+ use timely:: dataflow:: operators:: Operator ;
2226use timely:: dataflow:: Scope ;
2327
2428use mz_compute_client:: plan:: top_k:: {
5660 arity,
5761 limit,
5862 } ) => {
63+ let mut datum_vec = mz_repr:: DatumVec :: new ( ) ;
64+ let collection = ok_input. map ( move |row| {
65+ let group_row = {
66+ let datums = datum_vec. borrow_with ( & row) ;
67+ let iterator = group_key. iter ( ) . map ( |i| datums[ * i] ) ;
68+ let total_size = mz_repr:: datums_size ( iterator. clone ( ) ) ;
69+ let mut group_row = Row :: with_capacity ( total_size) ;
70+ group_row. packer ( ) . extend ( iterator) ;
71+ group_row
72+ } ;
73+ ( group_row, row)
74+ } ) ;
75+
76+ // For monotonic inputs, we are able to thin the input relation in two stages:
77+ // 1. First, we can do an intra-timestamp thinning which has the advantage of
78+ // being computed in a streaming fashion, even for the initial snapshot.
79+ // 2. Then, we can do inter-timestamp thinning by feeding back negations for
80+ // any records that have been invalidated.
81+ let collection = if let Some ( limit) = limit {
82+ render_intra_ts_thinning ( collection, order_key. clone ( ) , limit)
83+ } else {
84+ collection
85+ } ;
86+
87+ let collection =
88+ collection. map ( |( group_row, row) | ( ( group_row, row. hashed ( ) ) , row) ) ;
89+
5990 // For monotonic inputs, we are able to retract inputs that can no longer be produced
6091 // as outputs. Any inputs beyond `offset + limit` will never again be produced as
6192 // outputs, and can be removed. The simplest form of this is when `offset == 0` and
@@ -65,17 +96,24 @@ where
6596 // of `offset` and `limit`, discarding only the records not produced in the intermediate
6697 // stage.
6798 use differential_dataflow:: operators:: iterate:: Variable ;
68- let delay = std:: time:: Duration :: from_nanos ( 10_000_000_000 ) ;
99+ let delay = std:: time:: Duration :: from_secs ( 10 ) ;
69100 let retractions = Variable :: new (
70101 & mut ok_input. scope ( ) ,
71102 <G :: Timestamp as crate :: render:: RenderTimestamp >:: system_delay (
72103 delay. try_into ( ) . expect ( "must fit" ) ,
73104 ) ,
74105 ) ;
75- let thinned = ok_input. concat ( & retractions. negate ( ) ) ;
76- let result = build_topk ( thinned, group_key, order_key, 0 , limit, arity) ;
77- retractions. set ( & ok_input. concat ( & result. negate ( ) ) ) ;
78- result
106+ let thinned = collection. concat ( & retractions. negate ( ) ) ;
107+
108+ // As an additional optimization, we can skip creating the full topk hierachy
109+ // here since we now have an upper bound on the number records due to the
110+ // intra-ts thinning. The maximum number of records per timestamp is
111+ // (num_workers * limit), which we expect to be a small number and so we render
112+ // a single topk stage.
113+ let result = build_topk_stage ( thinned, order_key, 1u64 , 0 , limit, arity) ;
114+ retractions. set ( & collection. concat ( & result. negate ( ) ) ) ;
115+
116+ result. map ( |( ( _key, _hash) , row) | row)
79117 }
80118 TopKPlan :: Basic ( BasicTopKPlan {
81119 group_key,
@@ -317,6 +355,148 @@ where
317355 // TODO(#7331): Here we discard the arranged output.
318356 result. as_collection ( |_k, v| v. clone ( ) )
319357 }
358+
359+ fn render_intra_ts_thinning < G > (
360+ collection : Collection < G , ( Row , Row ) , Diff > ,
361+ order_key : Vec < mz_expr:: ColumnOrder > ,
362+ limit : usize ,
363+ ) -> Collection < G , ( Row , Row ) , Diff >
364+ where
365+ G : Scope ,
366+ G :: Timestamp : Lattice ,
367+ {
368+ let mut aggregates = HashMap :: new ( ) ;
369+ let mut vector = Vec :: new ( ) ;
370+ collection
371+ . inner
372+ . unary_notify (
373+ Pipeline ,
374+ "TopKIntraTimeThinning" ,
375+ [ ] ,
376+ move |input, output, notificator| {
377+ while let Some ( ( time, data) ) = input. next ( ) {
378+ data. swap ( & mut vector) ;
379+ let agg_time = aggregates
380+ . entry ( time. time ( ) . clone ( ) )
381+ . or_insert_with ( HashMap :: new) ;
382+ for ( ( grp_row, row) , record_time, diff) in vector. drain ( ..) {
383+ let monoid = monoids:: Top1Monoid {
384+ row,
385+ order_key : order_key. clone ( ) ,
386+ } ;
387+ let topk = agg_time. entry ( ( grp_row, record_time) ) . or_insert_with (
388+ move || {
389+ topk_agg:: TopKBatch :: new (
390+ limit. try_into ( ) . expect ( "must fit" ) ,
391+ )
392+ } ,
393+ ) ;
394+ topk. update ( monoid, diff) ;
395+ }
396+ notificator. notify_at ( time. retain ( ) ) ;
397+ }
398+
399+ notificator. for_each ( |time, _, _| {
400+ if let Some ( aggs) = aggregates. remove ( time. time ( ) ) {
401+ let mut session = output. session ( & time) ;
402+ for ( ( grp_row, record_time) , topk) in aggs {
403+ session. give_iterator ( topk. into_iter ( ) . map ( |( monoid, diff) | {
404+ ( ( grp_row. clone ( ) , monoid. row ) , record_time. clone ( ) , diff)
405+ } ) )
406+ }
407+ }
408+ } ) ;
409+ } ,
410+ )
411+ . as_collection ( )
412+ }
413+ }
414+ }
415+
416+ /// Types for in-place intra-ts aggregation of monotonic streams.
417+ pub mod topk_agg {
418+ use differential_dataflow:: consolidation;
419+ use smallvec:: SmallVec ;
420+
421+ pub struct TopKBatch < T > {
422+ updates : SmallVec < [ ( T , i64 ) ; 16 ] > ,
423+ clean : usize ,
424+ limit : i64 ,
425+ }
426+
427+ impl < T : Ord > TopKBatch < T > {
428+ pub fn new ( limit : i64 ) -> Self {
429+ Self {
430+ updates : SmallVec :: new ( ) ,
431+ clean : 0 ,
432+ limit,
433+ }
434+ }
435+
436+ /// Adds a new update, for `item` with `value`.
437+ ///
438+ /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
439+ /// half the length of the list, which would keep the total footprint within reasonable bounds
440+ /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
441+ /// is worth paying without some experimentation.
442+ #[ inline]
443+ pub fn update ( & mut self , item : T , value : i64 ) {
444+ self . updates . push ( ( item, value) ) ;
445+ self . maintain_bounds ( ) ;
446+ }
447+
448+ /// Compact the internal representation.
449+ ///
450+ /// This method sort `self.updates` and consolidates elements with equal item, discarding
451+ /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
452+ /// elements is non-zero.
453+ #[ inline]
454+ pub fn compact ( & mut self ) {
455+ if self . clean < self . updates . len ( ) && self . updates . len ( ) > 1 {
456+ let len = consolidation:: consolidate_slice ( & mut self . updates ) ;
457+ self . updates . truncate ( len) ;
458+
459+ // We can now retain only the first K records and throw away everything else
460+ let mut limit = self . limit ;
461+ self . updates . retain ( |x| {
462+ if limit > 0 {
463+ limit -= x. 1 ;
464+ true
465+ } else {
466+ false
467+ }
468+ } ) ;
469+ // By the end of the loop above `limit` will be less than or equal to zero. The
470+ // case where it goes negative is when the last record we retained had more copies
471+ // than necessary. For this reason we need to do one final adjustment of the diff
472+ // field of the last record so that the total sum of the diffs in the batch is K.
473+ if let Some ( item) = self . updates . last_mut ( ) {
474+ // We are subtracting the limit *negated*, therefore we are subtracting a value
475+ // that is *greater* than or equal to zero, which represents the excess.
476+ item. 1 -= -limit;
477+ }
478+ }
479+ self . clean = self . updates . len ( ) ;
480+ }
481+
482+ /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
483+ /// This function tries to minimize work by only compacting if enough work has accumulated.
484+ fn maintain_bounds ( & mut self ) {
485+ // if we have more than 32 elements and at least half of them are not clean, compact
486+ if self . updates . len ( ) > 32 && self . updates . len ( ) >> 1 >= self . clean {
487+ self . compact ( )
488+ }
489+ }
490+ }
491+
492+ impl < T : Ord > IntoIterator for TopKBatch < T > {
493+ type Item = ( T , i64 ) ;
494+ type IntoIter = smallvec:: IntoIter < [ ( T , i64 ) ; 16 ] > ;
495+
496+ fn into_iter ( mut self ) -> Self :: IntoIter {
497+ self . compact ( ) ;
498+ self . updates . into_iter ( )
499+ }
320500 }
321501}
322502
0 commit comments