From 67c9dcb55f7eb0c2ca6726a6dda0e222dbe0976a Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Tue, 24 Oct 2023 13:25:17 +0200 Subject: [PATCH] Allow join yielding by work and time simultaneously This commit extends the `YieldSpec` type and the syntax allowed for the `linear_join_yielding` system var to enable specifying yield strategies that consider both the performed work and the elapsed time. This will allow us to make sure that (a) join operators don't keep around huge amounts of output records and (b) join operators don't regress interactivity. --- src/adapter/src/flags.rs | 33 ++++++++++++++-------- src/buf.yaml | 2 ++ src/compute-types/src/dataflows.proto | 8 +++--- src/compute-types/src/dataflows.rs | 33 +++++++++------------- src/compute/src/render/join/linear_join.rs | 31 ++++++++++++++------ src/sql/src/session/vars.rs | 4 ++- 6 files changed, 67 insertions(+), 44 deletions(-) diff --git a/src/adapter/src/flags.rs b/src/adapter/src/flags.rs index ed41e768c8c20..212e69d413023 100644 --- a/src/adapter/src/flags.rs +++ b/src/adapter/src/flags.rs @@ -44,19 +44,30 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters { } fn parse_yield_spec(s: &str) -> Option { - let parts: Vec<_> = s.split(':').collect(); - match &parts[..] { - ["work", amount] => { - let amount = amount.parse().ok()?; - Some(YieldSpec::ByWork(amount)) - } - ["time", millis] => { - let millis = millis.parse().ok()?; - let duration = Duration::from_millis(millis); - Some(YieldSpec::ByTime(duration)) + let mut after_work = None; + let mut after_time = None; + + let options = s.split(',').map(|o| o.trim()); + for option in options { + let parts: Vec<_> = option.split(':').map(|p| p.trim()).collect(); + match &parts[..] { + ["work", amount] => { + let amount = amount.parse().ok()?; + after_work = Some(amount); + } + ["time", millis] => { + let millis = millis.parse().ok()?; + let duration = Duration::from_millis(millis); + after_time = Some(duration); + } + _ => return None, } - _ => None, } + + Some(YieldSpec { + after_work, + after_time, + }) } /// Return the current storage configuration, derived from the system configuration. diff --git a/src/buf.yaml b/src/buf.yaml index 50b83902fdc54..ad5c14e84772b 100644 --- a/src/buf.yaml +++ b/src/buf.yaml @@ -25,6 +25,8 @@ breaking: - compute-client/src/protocol/response.proto # reason: does currently not require backward-compatibility - compute-client/src/service.proto + # reason: does currently not require backward-compatibility + - compute-types/src/dataflows.proto # reason: Ignore because plans are currently not persisted. - expr/src/relation.proto # reason: still under active development diff --git a/src/compute-types/src/dataflows.proto b/src/compute-types/src/dataflows.proto index 776690c6c8865..b1a84613f96f3 100644 --- a/src/compute-types/src/dataflows.proto +++ b/src/compute-types/src/dataflows.proto @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +// buf breaking: ignore (does currently not require backward-compatibility) + syntax = "proto3"; import "compute-types/src/plan.proto"; @@ -66,8 +68,6 @@ message ProtoBuildDesc { } message ProtoYieldSpec { - oneof kind { - uint64 by_work = 1; - mz_proto.ProtoDuration by_time = 2; - } + optional uint64 after_work = 1; + optional mz_proto.ProtoDuration after_time = 2; } diff --git a/src/compute-types/src/dataflows.rs b/src/compute-types/src/dataflows.rs index 7a1c748d2c897..c7bc5f072914f 100644 --- a/src/compute-types/src/dataflows.rs +++ b/src/compute-types/src/dataflows.rs @@ -643,33 +643,26 @@ impl RustType for BuildDesc { /// Specification of a dataflow operator's yielding behavior. #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)] -pub enum YieldSpec { - ByWork(usize), - ByTime(Duration), +pub struct YieldSpec { + /// Yield after the given amount of work was performed. + pub after_work: Option, + /// Yield after the given amount of time has elapsed. + pub after_time: Option, } impl RustType for YieldSpec { fn into_proto(&self) -> ProtoYieldSpec { - use proto_yield_spec::Kind; - - let kind = match *self { - Self::ByWork(w) => Kind::ByWork(w.into_proto()), - Self::ByTime(t) => Kind::ByTime(t.into_proto()), - }; - ProtoYieldSpec { kind: Some(kind) } + ProtoYieldSpec { + after_work: self.after_work.into_proto(), + after_time: self.after_time.into_proto(), + } } fn from_proto(proto: ProtoYieldSpec) -> Result { - use proto_yield_spec::Kind; - - let Some(kind) = proto.kind else { - return Err(TryFromProtoError::missing_field("ProtoYieldSpec::kind")); - }; - let spec = match kind { - Kind::ByWork(w) => Self::ByWork(w.into_rust()?), - Kind::ByTime(t) => Self::ByTime(t.into_rust()?), - }; - Ok(spec) + Ok(Self { + after_work: proto.after_work.into_rust()?, + after_time: proto.after_time.into_rust()?, + }) } } diff --git a/src/compute/src/render/join/linear_join.rs b/src/compute/src/render/join/linear_join.rs index 939b54578103d..6a59bc3ae78fb 100644 --- a/src/compute/src/render/join/linear_join.rs +++ b/src/compute/src/render/join/linear_join.rs @@ -61,7 +61,10 @@ impl Default for LinearJoinSpec { fn default() -> Self { Self { implementation: LinearJoinImpl::Materialize, - yielding: YieldSpec::ByWork(1_000_000), + yielding: YieldSpec { + after_work: Some(1_000_000), + after_time: None, + }, } } } @@ -88,18 +91,30 @@ impl LinearJoinSpec { V2: Data, { use LinearJoinImpl::*; - use YieldSpec::*; - match (self.implementation, self.yielding) { - (DifferentialDataflow, _) => { + match ( + self.implementation, + self.yielding.after_work, + self.yielding.after_time, + ) { + (DifferentialDataflow, _, _) => { differential_dataflow::operators::JoinCore::join_core(arranged1, arranged2, result) } - (Materialize, ByWork(limit)) => { - let yield_fn = move |_start, work| work >= limit; + (Materialize, Some(work_limit), Some(time_limit)) => { + let yield_fn = + move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit; + mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) + } + (Materialize, Some(work_limit), None) => { + let yield_fn = move |_start, work| work >= work_limit; + mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) + } + (Materialize, None, Some(time_limit)) => { + let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit; mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) } - (Materialize, ByTime(limit)) => { - let yield_fn = move |start: Instant, _work| start.elapsed() >= limit; + (Materialize, None, None) => { + let yield_fn = |_start, _work| false; mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) } } diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 1c231faaab971..95c7a610b88ed 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1311,7 +1311,9 @@ static LINEAR_JOIN_YIELDING: Lazy> = Lazy::new(|| ServerVar { value: &DEFAULT_LINEAR_JOIN_YIELDING, description: "The yielding behavior compute rendering should apply for linear join operators. Either \ - 'work:' or 'time:'.", + 'work:' or 'time:' or 'work:,time:'. Note \ + that omitting one of 'work' or 'time' will entirely disable join yielding by time or \ + work, respectively, rather than falling back to some default.", internal: true, });