Skip to content

Commit d7489ee

Browse files
committed
compute: tokenize mz_join_core
This commit adds a shutdown token check to the `mz_join_core` linear join implementation. When the dataflow is shutting down, this makes the operator discard all its existing work and new input data, rather than processing it. As a result, differential join operators shut down faster and emit less data, which in turn speeds up shutdown of downstream operators. Unfortunately, we can't make the same change for the DD join operator. We could add a token check into the result closure we pass to that operator, but the shutdown check would interfere with the fueling of the DD join operator. Fuel is consumed based on the number of updates emitted. When the token is dropped, the join closure stops producing updates, which means the operator stops consuming fuel, so it does not yield anymore until it has drained all its inputs. If there are many inputs left, the replica may not accept commands for potentially quite a long time.
1 parent 50fb688 commit d7489ee

File tree

2 files changed

+135
-38
lines changed

2 files changed

+135
-38
lines changed

src/compute/src/render/join/linear_join.rs

Lines changed: 115 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use timely::progress::timestamp::{Refines, Timestamp};
3030

3131
use crate::extensions::arrange::MzArrange;
3232
use crate::render::context::{
33-
ArrangementFlavor, CollectionBundle, Context, SpecializedArrangement,
33+
ArrangementFlavor, CollectionBundle, Context, ShutdownToken, SpecializedArrangement,
3434
SpecializedArrangementImport,
3535
};
3636
use crate::render::join::mz_join_core::mz_join_core;
@@ -72,6 +72,7 @@ impl LinearJoinSpec {
7272
&self,
7373
arranged1: &Arranged<G, Tr1>,
7474
arranged2: &Arranged<G, Tr2>,
75+
shutdown_token: ShutdownToken,
7576
result: L,
7677
) -> Collection<G, I::Item, Diff>
7778
where
@@ -95,11 +96,11 @@ impl LinearJoinSpec {
9596
}
9697
(Materialize, ByWork(limit)) => {
9798
let yield_fn = move |_start, work| work >= limit;
98-
mz_join_core(arranged1, arranged2, result, yield_fn)
99+
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
99100
}
100101
(Materialize, ByTime(limit)) => {
101102
let yield_fn = move |start: Instant, _work| start.elapsed() >= limit;
102-
mz_join_core(arranged1, arranged2, result, yield_fn)
103+
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
103104
}
104105
}
105106
}
@@ -198,6 +199,7 @@ where
198199
inputs[stage_plan.lookup_relation].enter_region(inner),
199200
stage_plan,
200201
&mut errors,
202+
self.shutdown_token.clone(),
201203
self.enable_specialized_arrangements,
202204
);
203205
// Update joined results and capture any errors.
@@ -256,6 +258,7 @@ fn differential_join<G, T>(
256258
lookup_relation: _,
257259
}: LinearStagePlan,
258260
errors: &mut Vec<Collection<G, DataflowError, Diff>>,
261+
shutdown_token: ShutdownToken,
259262
_enable_specialized_arrangements: bool,
260263
) -> Collection<G, Row, Diff>
261264
where
@@ -305,31 +308,51 @@ where
305308
}
306309
JoinedFlavor::Local(local) => match arrangement {
307310
ArrangementFlavor::Local(oks, errs1) => {
308-
let (oks, errs2) =
309-
dispatch_differential_join_inner_local_local(join_spec, local, oks, closure);
311+
let (oks, errs2) = dispatch_differential_join_inner_local_local(
312+
join_spec,
313+
local,
314+
oks,
315+
closure,
316+
shutdown_token,
317+
);
310318
errors.push(errs1.as_collection(|k, _v| k.clone()));
311319
errors.extend(errs2);
312320
oks
313321
}
314322
ArrangementFlavor::Trace(_gid, oks, errs1) => {
315-
let (oks, errs2) =
316-
dispatch_differential_join_inner_local_trace(join_spec, local, oks, closure);
323+
let (oks, errs2) = dispatch_differential_join_inner_local_trace(
324+
join_spec,
325+
local,
326+
oks,
327+
closure,
328+
shutdown_token,
329+
);
317330
errors.push(errs1.as_collection(|k, _v| k.clone()));
318331
errors.extend(errs2);
319332
oks
320333
}
321334
},
322335
JoinedFlavor::Trace(trace) => match arrangement {
323336
ArrangementFlavor::Local(oks, errs1) => {
324-
let (oks, errs2) =
325-
dispatch_differential_join_inner_trace_local(join_spec, trace, oks, closure);
337+
let (oks, errs2) = dispatch_differential_join_inner_trace_local(
338+
join_spec,
339+
trace,
340+
oks,
341+
closure,
342+
shutdown_token,
343+
);
326344
errors.push(errs1.as_collection(|k, _v| k.clone()));
327345
errors.extend(errs2);
328346
oks
329347
}
330348
ArrangementFlavor::Trace(_gid, oks, errs1) => {
331-
let (oks, errs2) =
332-
dispatch_differential_join_inner_trace_trace(join_spec, trace, oks, closure);
349+
let (oks, errs2) = dispatch_differential_join_inner_trace_trace(
350+
join_spec,
351+
trace,
352+
oks,
353+
closure,
354+
shutdown_token,
355+
);
333356
errors.push(errs1.as_collection(|k, _v| k.clone()));
334357
errors.extend(errs2);
335358
oks
@@ -344,6 +367,7 @@ fn dispatch_differential_join_inner_local_local<G>(
344367
prev_keyed: SpecializedArrangement<G>,
345368
next_input: SpecializedArrangement<G>,
346369
closure: JoinClosure,
370+
shutdown_token: ShutdownToken,
347371
) -> (
348372
Collection<G, Row, Diff>,
349373
Option<Collection<G, DataflowError, Diff>>,
@@ -364,6 +388,7 @@ where
364388
Some(vec![]),
365389
Some(vec![]),
366390
closure,
391+
shutdown_token,
367392
),
368393
(
369394
SpecializedArrangement::RowUnit(prev_keyed),
@@ -376,6 +401,7 @@ where
376401
Some(vec![]),
377402
None,
378403
closure,
404+
shutdown_token,
379405
),
380406
(
381407
SpecializedArrangement::RowRow(prev_keyed),
@@ -388,11 +414,12 @@ where
388414
None,
389415
Some(vec![]),
390416
closure,
417+
shutdown_token,
391418
),
392419
(
393420
SpecializedArrangement::RowRow(prev_keyed),
394421
SpecializedArrangement::RowRow(next_input),
395-
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
422+
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure, shutdown_token),
396423
}
397424
}
398425

@@ -402,6 +429,7 @@ fn dispatch_differential_join_inner_local_trace<G, T>(
402429
prev_keyed: SpecializedArrangement<G>,
403430
next_input: SpecializedArrangementImport<G, T>,
404431
closure: JoinClosure,
432+
shutdown_token: ShutdownToken,
405433
) -> (
406434
Collection<G, Row, Diff>,
407435
Option<Collection<G, DataflowError, Diff>>,
@@ -423,6 +451,7 @@ where
423451
Some(vec![]),
424452
Some(vec![]),
425453
closure,
454+
shutdown_token,
426455
),
427456
(
428457
SpecializedArrangement::RowUnit(prev_keyed),
@@ -435,6 +464,7 @@ where
435464
Some(vec![]),
436465
None,
437466
closure,
467+
shutdown_token,
438468
),
439469
(
440470
SpecializedArrangement::RowRow(prev_keyed),
@@ -447,11 +477,21 @@ where
447477
None,
448478
Some(vec![]),
449479
closure,
480+
shutdown_token,
450481
),
451482
(
452483
SpecializedArrangement::RowRow(prev_keyed),
453484
SpecializedArrangementImport::RowRow(next_input),
454-
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
485+
) => differential_join_inner(
486+
join_spec,
487+
prev_keyed,
488+
next_input,
489+
None,
490+
None,
491+
None,
492+
closure,
493+
shutdown_token,
494+
),
455495
}
456496
}
457497

@@ -461,6 +501,7 @@ fn dispatch_differential_join_inner_trace_local<G, T>(
461501
prev_keyed: SpecializedArrangementImport<G, T>,
462502
next_input: SpecializedArrangement<G>,
463503
closure: JoinClosure,
504+
shutdown_token: ShutdownToken,
464505
) -> (
465506
Collection<G, Row, Diff>,
466507
Option<Collection<G, DataflowError, Diff>>,
@@ -482,6 +523,7 @@ where
482523
Some(vec![]),
483524
Some(vec![]),
484525
closure,
526+
shutdown_token,
485527
),
486528
(
487529
SpecializedArrangementImport::RowUnit(prev_keyed),
@@ -494,6 +536,7 @@ where
494536
Some(vec![]),
495537
None,
496538
closure,
539+
shutdown_token,
497540
),
498541
(
499542
SpecializedArrangementImport::RowRow(prev_keyed),
@@ -506,11 +549,21 @@ where
506549
None,
507550
Some(vec![]),
508551
closure,
552+
shutdown_token,
509553
),
510554
(
511555
SpecializedArrangementImport::RowRow(prev_keyed),
512556
SpecializedArrangement::RowRow(next_input),
513-
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
557+
) => differential_join_inner(
558+
join_spec,
559+
prev_keyed,
560+
next_input,
561+
None,
562+
None,
563+
None,
564+
closure,
565+
shutdown_token,
566+
),
514567
}
515568
}
516569

@@ -520,6 +573,7 @@ fn dispatch_differential_join_inner_trace_trace<G, T>(
520573
prev_keyed: SpecializedArrangementImport<G, T>,
521574
next_input: SpecializedArrangementImport<G, T>,
522575
closure: JoinClosure,
576+
shutdown_token: ShutdownToken,
523577
) -> (
524578
Collection<G, Row, Diff>,
525579
Option<Collection<G, DataflowError, Diff>>,
@@ -541,6 +595,7 @@ where
541595
Some(vec![]),
542596
Some(vec![]),
543597
closure,
598+
shutdown_token,
544599
),
545600
(
546601
SpecializedArrangementImport::RowUnit(prev_keyed),
@@ -553,6 +608,7 @@ where
553608
Some(vec![]),
554609
None,
555610
closure,
611+
shutdown_token,
556612
),
557613
(
558614
SpecializedArrangementImport::RowRow(prev_keyed),
@@ -565,11 +621,21 @@ where
565621
None,
566622
Some(vec![]),
567623
closure,
624+
shutdown_token,
568625
),
569626
(
570627
SpecializedArrangementImport::RowRow(prev_keyed),
571628
SpecializedArrangementImport::RowRow(next_input),
572-
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
629+
) => differential_join_inner(
630+
join_spec,
631+
prev_keyed,
632+
next_input,
633+
None,
634+
None,
635+
None,
636+
closure,
637+
shutdown_token,
638+
),
573639
}
574640
}
575641

@@ -587,6 +653,7 @@ fn differential_join_inner<G, T, Tr1, Tr2, K, V1, V2>(
587653
prev_types: Option<Vec<ColumnType>>,
588654
next_types: Option<Vec<ColumnType>>,
589655
closure: JoinClosure,
656+
shutdown_token: ShutdownToken,
590657
) -> (
591658
Collection<G, Row, Diff>,
592659
Option<Collection<G, DataflowError, Diff>>,
@@ -611,18 +678,23 @@ where
611678

612679
if closure.could_error() {
613680
let (oks, err) = join_spec
614-
.render(&prev_keyed, &next_input, move |key, old, new| {
615-
let key = key.into_row(&mut key_buf, key_types.as_deref());
616-
let old = old.into_row(&mut old_buf, prev_types.as_deref());
617-
let new = new.into_row(&mut new_buf, next_types.as_deref());
618-
619-
let temp_storage = RowArena::new();
620-
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
621-
closure
622-
.apply(&mut datums_local, &temp_storage, &mut row_builder)
623-
.map_err(DataflowError::from)
624-
.transpose()
625-
})
681+
.render(
682+
&prev_keyed,
683+
&next_input,
684+
shutdown_token,
685+
move |key, old, new| {
686+
let key = key.into_row(&mut key_buf, key_types.as_deref());
687+
let old = old.into_row(&mut old_buf, prev_types.as_deref());
688+
let new = new.into_row(&mut new_buf, next_types.as_deref());
689+
690+
let temp_storage = RowArena::new();
691+
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
692+
closure
693+
.apply(&mut datums_local, &temp_storage, &mut row_builder)
694+
.map_err(DataflowError::from)
695+
.transpose()
696+
},
697+
)
626698
.inner
627699
.ok_err(|(x, t, d)| {
628700
// TODO(mcsherry): consider `ok_err()` for `Collection`.
@@ -634,17 +706,22 @@ where
634706

635707
(oks.as_collection(), Some(err.as_collection()))
636708
} else {
637-
let oks = join_spec.render(&prev_keyed, &next_input, move |key, old, new| {
638-
let key = key.into_row(&mut key_buf, key_types.as_deref());
639-
let old = old.into_row(&mut old_buf, prev_types.as_deref());
640-
let new = new.into_row(&mut new_buf, next_types.as_deref());
641-
642-
let temp_storage = RowArena::new();
643-
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
644-
closure
645-
.apply(&mut datums_local, &temp_storage, &mut row_builder)
646-
.expect("Closure claimed to never error")
647-
});
709+
let oks = join_spec.render(
710+
&prev_keyed,
711+
&next_input,
712+
shutdown_token,
713+
move |key, old, new| {
714+
let key = key.into_row(&mut key_buf, key_types.as_deref());
715+
let old = old.into_row(&mut old_buf, prev_types.as_deref());
716+
let new = new.into_row(&mut new_buf, next_types.as_deref());
717+
718+
let temp_storage = RowArena::new();
719+
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
720+
closure
721+
.apply(&mut datums_local, &temp_storage, &mut row_builder)
722+
.expect("Closure claimed to never error")
723+
},
724+
);
648725

649726
(oks, None)
650727
}

0 commit comments

Comments
 (0)