Skip to content

Commit 4c7e240

Browse files
committed
compute: tokenize mz_join_core
This commit adds a shutdown token check to the `mz_join_core` linear join implementation. When the dataflow is shutting down, this makes the operator discard all its existing work and new input data, rather than processing it. As a result, differential join operators shut down faster and emit less data, which in turn speeds up shutdown of downstream operators. Unfortunately, we can't make the same change for the DD join operator. We could add a token check into the result closure we pass to that operator, but the shutdown check would interfere with the fueling of the DD join operator. Fuel is consumed based on the number of updates emitted. When the token is dropped, the join closure stops producing updates, which means the operator stops consuming fuel, so it does not yield anymore until it has drained all its inputs. If there are many inputs left, the replica may not accept commands for potentially quite a long time.
1 parent 50fb688 commit 4c7e240

File tree

2 files changed

+144
-38
lines changed

2 files changed

+144
-38
lines changed

src/compute/src/render/join/linear_join.rs

Lines changed: 124 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use timely::progress::timestamp::{Refines, Timestamp};
3030

3131
use crate::extensions::arrange::MzArrange;
3232
use crate::render::context::{
33-
ArrangementFlavor, CollectionBundle, Context, SpecializedArrangement,
33+
ArrangementFlavor, CollectionBundle, Context, ShutdownToken, SpecializedArrangement,
3434
SpecializedArrangementImport,
3535
};
3636
use crate::render::join::mz_join_core::mz_join_core;
@@ -72,6 +72,7 @@ impl LinearJoinSpec {
7272
&self,
7373
arranged1: &Arranged<G, Tr1>,
7474
arranged2: &Arranged<G, Tr2>,
75+
shutdown_token: ShutdownToken,
7576
result: L,
7677
) -> Collection<G, I::Item, Diff>
7778
where
@@ -95,11 +96,11 @@ impl LinearJoinSpec {
9596
}
9697
(Materialize, ByWork(limit)) => {
9798
let yield_fn = move |_start, work| work >= limit;
98-
mz_join_core(arranged1, arranged2, result, yield_fn)
99+
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
99100
}
100101
(Materialize, ByTime(limit)) => {
101102
let yield_fn = move |start: Instant, _work| start.elapsed() >= limit;
102-
mz_join_core(arranged1, arranged2, result, yield_fn)
103+
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
103104
}
104105
}
105106
}
@@ -198,6 +199,7 @@ where
198199
inputs[stage_plan.lookup_relation].enter_region(inner),
199200
stage_plan,
200201
&mut errors,
202+
self.shutdown_token.clone(),
201203
self.enable_specialized_arrangements,
202204
);
203205
// Update joined results and capture any errors.
@@ -256,6 +258,7 @@ fn differential_join<G, T>(
256258
lookup_relation: _,
257259
}: LinearStagePlan,
258260
errors: &mut Vec<Collection<G, DataflowError, Diff>>,
261+
shutdown_token: ShutdownToken,
259262
_enable_specialized_arrangements: bool,
260263
) -> Collection<G, Row, Diff>
261264
where
@@ -305,31 +308,51 @@ where
305308
}
306309
JoinedFlavor::Local(local) => match arrangement {
307310
ArrangementFlavor::Local(oks, errs1) => {
308-
let (oks, errs2) =
309-
dispatch_differential_join_inner_local_local(join_spec, local, oks, closure);
311+
let (oks, errs2) = dispatch_differential_join_inner_local_local(
312+
join_spec,
313+
local,
314+
oks,
315+
closure,
316+
shutdown_token,
317+
);
310318
errors.push(errs1.as_collection(|k, _v| k.clone()));
311319
errors.extend(errs2);
312320
oks
313321
}
314322
ArrangementFlavor::Trace(_gid, oks, errs1) => {
315-
let (oks, errs2) =
316-
dispatch_differential_join_inner_local_trace(join_spec, local, oks, closure);
323+
let (oks, errs2) = dispatch_differential_join_inner_local_trace(
324+
join_spec,
325+
local,
326+
oks,
327+
closure,
328+
shutdown_token,
329+
);
317330
errors.push(errs1.as_collection(|k, _v| k.clone()));
318331
errors.extend(errs2);
319332
oks
320333
}
321334
},
322335
JoinedFlavor::Trace(trace) => match arrangement {
323336
ArrangementFlavor::Local(oks, errs1) => {
324-
let (oks, errs2) =
325-
dispatch_differential_join_inner_trace_local(join_spec, trace, oks, closure);
337+
let (oks, errs2) = dispatch_differential_join_inner_trace_local(
338+
join_spec,
339+
trace,
340+
oks,
341+
closure,
342+
shutdown_token,
343+
);
326344
errors.push(errs1.as_collection(|k, _v| k.clone()));
327345
errors.extend(errs2);
328346
oks
329347
}
330348
ArrangementFlavor::Trace(_gid, oks, errs1) => {
331-
let (oks, errs2) =
332-
dispatch_differential_join_inner_trace_trace(join_spec, trace, oks, closure);
349+
let (oks, errs2) = dispatch_differential_join_inner_trace_trace(
350+
join_spec,
351+
trace,
352+
oks,
353+
closure,
354+
shutdown_token,
355+
);
333356
errors.push(errs1.as_collection(|k, _v| k.clone()));
334357
errors.extend(errs2);
335358
oks
@@ -344,6 +367,7 @@ fn dispatch_differential_join_inner_local_local<G>(
344367
prev_keyed: SpecializedArrangement<G>,
345368
next_input: SpecializedArrangement<G>,
346369
closure: JoinClosure,
370+
shutdown_token: ShutdownToken,
347371
) -> (
348372
Collection<G, Row, Diff>,
349373
Option<Collection<G, DataflowError, Diff>>,
@@ -364,6 +388,7 @@ where
364388
Some(vec![]),
365389
Some(vec![]),
366390
closure,
391+
shutdown_token,
367392
),
368393
(
369394
SpecializedArrangement::RowUnit(prev_keyed),
@@ -376,6 +401,7 @@ where
376401
Some(vec![]),
377402
None,
378403
closure,
404+
shutdown_token,
379405
),
380406
(
381407
SpecializedArrangement::RowRow(prev_keyed),
@@ -388,11 +414,21 @@ where
388414
None,
389415
Some(vec![]),
390416
closure,
417+
shutdown_token,
391418
),
392419
(
393420
SpecializedArrangement::RowRow(prev_keyed),
394421
SpecializedArrangement::RowRow(next_input),
395-
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
422+
) => differential_join_inner(
423+
join_spec,
424+
prev_keyed,
425+
next_input,
426+
None,
427+
None,
428+
None,
429+
closure,
430+
shutdown_token,
431+
),
396432
}
397433
}
398434

@@ -402,6 +438,7 @@ fn dispatch_differential_join_inner_local_trace<G, T>(
402438
prev_keyed: SpecializedArrangement<G>,
403439
next_input: SpecializedArrangementImport<G, T>,
404440
closure: JoinClosure,
441+
shutdown_token: ShutdownToken,
405442
) -> (
406443
Collection<G, Row, Diff>,
407444
Option<Collection<G, DataflowError, Diff>>,
@@ -423,6 +460,7 @@ where
423460
Some(vec![]),
424461
Some(vec![]),
425462
closure,
463+
shutdown_token,
426464
),
427465
(
428466
SpecializedArrangement::RowUnit(prev_keyed),
@@ -435,6 +473,7 @@ where
435473
Some(vec![]),
436474
None,
437475
closure,
476+
shutdown_token,
438477
),
439478
(
440479
SpecializedArrangement::RowRow(prev_keyed),
@@ -447,11 +486,21 @@ where
447486
None,
448487
Some(vec![]),
449488
closure,
489+
shutdown_token,
450490
),
451491
(
452492
SpecializedArrangement::RowRow(prev_keyed),
453493
SpecializedArrangementImport::RowRow(next_input),
454-
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
494+
) => differential_join_inner(
495+
join_spec,
496+
prev_keyed,
497+
next_input,
498+
None,
499+
None,
500+
None,
501+
closure,
502+
shutdown_token,
503+
),
455504
}
456505
}
457506

@@ -461,6 +510,7 @@ fn dispatch_differential_join_inner_trace_local<G, T>(
461510
prev_keyed: SpecializedArrangementImport<G, T>,
462511
next_input: SpecializedArrangement<G>,
463512
closure: JoinClosure,
513+
shutdown_token: ShutdownToken,
464514
) -> (
465515
Collection<G, Row, Diff>,
466516
Option<Collection<G, DataflowError, Diff>>,
@@ -482,6 +532,7 @@ where
482532
Some(vec![]),
483533
Some(vec![]),
484534
closure,
535+
shutdown_token,
485536
),
486537
(
487538
SpecializedArrangementImport::RowUnit(prev_keyed),
@@ -494,6 +545,7 @@ where
494545
Some(vec![]),
495546
None,
496547
closure,
548+
shutdown_token,
497549
),
498550
(
499551
SpecializedArrangementImport::RowRow(prev_keyed),
@@ -506,11 +558,21 @@ where
506558
None,
507559
Some(vec![]),
508560
closure,
561+
shutdown_token,
509562
),
510563
(
511564
SpecializedArrangementImport::RowRow(prev_keyed),
512565
SpecializedArrangement::RowRow(next_input),
513-
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
566+
) => differential_join_inner(
567+
join_spec,
568+
prev_keyed,
569+
next_input,
570+
None,
571+
None,
572+
None,
573+
closure,
574+
shutdown_token,
575+
),
514576
}
515577
}
516578

@@ -520,6 +582,7 @@ fn dispatch_differential_join_inner_trace_trace<G, T>(
520582
prev_keyed: SpecializedArrangementImport<G, T>,
521583
next_input: SpecializedArrangementImport<G, T>,
522584
closure: JoinClosure,
585+
shutdown_token: ShutdownToken,
523586
) -> (
524587
Collection<G, Row, Diff>,
525588
Option<Collection<G, DataflowError, Diff>>,
@@ -541,6 +604,7 @@ where
541604
Some(vec![]),
542605
Some(vec![]),
543606
closure,
607+
shutdown_token,
544608
),
545609
(
546610
SpecializedArrangementImport::RowUnit(prev_keyed),
@@ -553,6 +617,7 @@ where
553617
Some(vec![]),
554618
None,
555619
closure,
620+
shutdown_token,
556621
),
557622
(
558623
SpecializedArrangementImport::RowRow(prev_keyed),
@@ -565,11 +630,21 @@ where
565630
None,
566631
Some(vec![]),
567632
closure,
633+
shutdown_token,
568634
),
569635
(
570636
SpecializedArrangementImport::RowRow(prev_keyed),
571637
SpecializedArrangementImport::RowRow(next_input),
572-
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
638+
) => differential_join_inner(
639+
join_spec,
640+
prev_keyed,
641+
next_input,
642+
None,
643+
None,
644+
None,
645+
closure,
646+
shutdown_token,
647+
),
573648
}
574649
}
575650

@@ -587,6 +662,7 @@ fn differential_join_inner<G, T, Tr1, Tr2, K, V1, V2>(
587662
prev_types: Option<Vec<ColumnType>>,
588663
next_types: Option<Vec<ColumnType>>,
589664
closure: JoinClosure,
665+
shutdown_token: ShutdownToken,
590666
) -> (
591667
Collection<G, Row, Diff>,
592668
Option<Collection<G, DataflowError, Diff>>,
@@ -611,18 +687,23 @@ where
611687

612688
if closure.could_error() {
613689
let (oks, err) = join_spec
614-
.render(&prev_keyed, &next_input, move |key, old, new| {
615-
let key = key.into_row(&mut key_buf, key_types.as_deref());
616-
let old = old.into_row(&mut old_buf, prev_types.as_deref());
617-
let new = new.into_row(&mut new_buf, next_types.as_deref());
618-
619-
let temp_storage = RowArena::new();
620-
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
621-
closure
622-
.apply(&mut datums_local, &temp_storage, &mut row_builder)
623-
.map_err(DataflowError::from)
624-
.transpose()
625-
})
690+
.render(
691+
&prev_keyed,
692+
&next_input,
693+
shutdown_token,
694+
move |key, old, new| {
695+
let key = key.into_row(&mut key_buf, key_types.as_deref());
696+
let old = old.into_row(&mut old_buf, prev_types.as_deref());
697+
let new = new.into_row(&mut new_buf, next_types.as_deref());
698+
699+
let temp_storage = RowArena::new();
700+
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
701+
closure
702+
.apply(&mut datums_local, &temp_storage, &mut row_builder)
703+
.map_err(DataflowError::from)
704+
.transpose()
705+
},
706+
)
626707
.inner
627708
.ok_err(|(x, t, d)| {
628709
// TODO(mcsherry): consider `ok_err()` for `Collection`.
@@ -634,17 +715,22 @@ where
634715

635716
(oks.as_collection(), Some(err.as_collection()))
636717
} else {
637-
let oks = join_spec.render(&prev_keyed, &next_input, move |key, old, new| {
638-
let key = key.into_row(&mut key_buf, key_types.as_deref());
639-
let old = old.into_row(&mut old_buf, prev_types.as_deref());
640-
let new = new.into_row(&mut new_buf, next_types.as_deref());
641-
642-
let temp_storage = RowArena::new();
643-
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
644-
closure
645-
.apply(&mut datums_local, &temp_storage, &mut row_builder)
646-
.expect("Closure claimed to never error")
647-
});
718+
let oks = join_spec.render(
719+
&prev_keyed,
720+
&next_input,
721+
shutdown_token,
722+
move |key, old, new| {
723+
let key = key.into_row(&mut key_buf, key_types.as_deref());
724+
let old = old.into_row(&mut old_buf, prev_types.as_deref());
725+
let new = new.into_row(&mut new_buf, next_types.as_deref());
726+
727+
let temp_storage = RowArena::new();
728+
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
729+
closure
730+
.apply(&mut datums_local, &temp_storage, &mut row_builder)
731+
.expect("Closure claimed to never error")
732+
},
733+
);
648734

649735
(oks, None)
650736
}

0 commit comments

Comments
 (0)