From 98b3c85bf34152c722c99831f90e6e584eb11d12 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Wed, 8 Mar 2023 13:20:56 +0100 Subject: [PATCH] timely-util: async output handles This PR replaces the synchronous timely handles with async ones. Their main benefit is that it is totally fine to hold handles active across await points which produces both better ergonomics and more efficient batch creation for the cases where we produce messages one by one. All the methods on the handle that send data are marked `async` so that the handle can automatically trigger a yield. The fuel system is not implemented in this PR but it will follow soon after this merges. Signed-off-by: Petros Angelatos --- src/compute/src/sink/persist_sink.rs | 8 +- .../src/operators/shard_source.rs | 12 +- src/storage/src/decode/mod.rs | 17 +- .../src/source/source_reader_pipeline.rs | 73 +++------ src/timely-util/src/builder_async.rs | 148 ++++++++++++++++-- src/timely-util/src/operator.rs | 16 +- 6 files changed, 187 insertions(+), 87 deletions(-) diff --git a/src/compute/src/sink/persist_sink.rs b/src/compute/src/sink/persist_sink.rs index a806558248d63..8b3d6fc3da561 100644 --- a/src/compute/src/sink/persist_sink.rs +++ b/src/compute/src/sink/persist_sink.rs @@ -460,9 +460,7 @@ where batch_description ); - let mut output = output.activate(); - let mut session = output.session(&cap); - session.give(batch_description); + output.give(&cap, batch_description).await; // WIP: We downgrade our capability so that downstream // operators (writer and appender) can know when all the @@ -823,9 +821,7 @@ where } }; - let mut output = output.activate(); - let mut session = output.session(&cap); - session.give(batch_or_data); + output.give(&cap, batch_or_data).await; } } } else { diff --git a/src/persist-client/src/operators/shard_source.rs b/src/persist-client/src/operators/shard_source.rs index da7875755d1b1..06f92890039bb 100644 --- a/src/persist-client/src/operators/shard_source.rs +++ b/src/persist-client/src/operators/shard_source.rs @@ -309,8 +309,6 @@ where } Some(Ok(ListenEvent::Progress(progress))) => { let session_cap = cap_set.delayed(¤t_ts); - let mut descs_output = descs_output.activate(); - let mut descs_session = descs_output.session(&session_cap); // NB: in order to play nice with downstream operators whose invariants // depend on seeing the full contents of an individual batch, we must @@ -328,7 +326,7 @@ where // doing instead here, but this has seemed to work // okay so far. Continue to revisit as necessary. let worker_idx = usize::cast_from(Instant::now().hashed()) % num_workers; - descs_session.give((worker_idx, part_desc.into_exchangeable_part())); + descs_output.give(&session_cap, (worker_idx, part_desc.into_exchangeable_part())).await; } bytes_emitted }; @@ -480,12 +478,10 @@ where // outputs or sessions across await points, which // would prevent messages from being flushed from // the shared timely output buffer. - let mut fetched_output = fetched_output.activate(); - let mut tokens_output = tokens_output.activate(); - fetched_output.session(&cap).give(fetched); + fetched_output.give(&cap, fetched).await; tokens_output - .session(&cap) - .give(token.into_exchangeable_part()); + .give(&cap, token.into_exchangeable_part()) + .await; } } } diff --git a/src/storage/src/decode/mod.rs b/src/storage/src/decode/mod.rs index 4a43e4a5a6a2c..bc06d83f32457 100644 --- a/src/storage/src/decode/mod.rs +++ b/src/storage/src/decode/mod.rs @@ -582,9 +582,8 @@ where headers.as_deref(), ); - let mut output = output.activate(); if value_bytes_remaining.is_empty() { - output.session(&cap).give(DecodeResult { + let result = DecodeResult { key: None, value: Some(value.map(|r| (r, 1)).map_err(|inner| { DecodeError { @@ -596,11 +595,12 @@ where upstream_time_millis, partition: partition.clone(), metadata, - }); + }; + output.give(&cap, result).await; value_buf = vec![]; break; } else { - output.session(&cap).give(DecodeResult { + let result = DecodeResult { key: None, value: Some(value.map(|r| (r, 1)).map_err(|inner| { DecodeError { @@ -612,7 +612,8 @@ where upstream_time_millis, partition: partition.clone(), metadata, - }); + }; + output.give(&cap, result).await; } if is_err { // If decoding has gone off the rails, we can no longer be sure that the delimiters are correct, so it @@ -650,8 +651,7 @@ where headers.as_deref(), ); - let mut output = output.activate(); - output.session(&cap).give(DecodeResult { + let result = DecodeResult { key: None, value: Some(value.map(|r| (r, 1)).map_err(|inner| DecodeError { kind: inner, @@ -661,7 +661,8 @@ where upstream_time_millis, partition: partition.clone(), metadata, - }); + }; + output.give(&cap, result).await; } } } diff --git a/src/storage/src/source/source_reader_pipeline.rs b/src/storage/src/source/source_reader_pipeline.rs index 52f74bb7d5495..6c3770dfe0c4d 100644 --- a/src/storage/src/source/source_reader_pipeline.rs +++ b/src/storage/src/source/source_reader_pipeline.rs @@ -44,7 +44,6 @@ use timely::dataflow::channels::pact::{Exchange, Pipeline}; use timely::dataflow::channels::pushers::Tee; use timely::dataflow::operators::capture::capture::Capture; use timely::dataflow::operators::capture::Event; -use timely::dataflow::operators::generic::OutputHandle; use timely::dataflow::operators::{Broadcast, CapabilitySet, Partition}; use timely::dataflow::{Scope, Stream}; use timely::progress::frontier::MutableAntichain; @@ -68,7 +67,9 @@ use mz_storage_client::types::errors::SourceError; use mz_storage_client::types::sources::encoding::SourceDataEncoding; use mz_storage_client::types::sources::{MzOffset, SourceConnection, SourceTimestamp, SourceToken}; use mz_timely_util::antichain::AntichainExt; -use mz_timely_util::builder_async::{Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder}; +use mz_timely_util::builder_async::{ + AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, +}; use mz_timely_util::capture::UnboundedTokioCapture; use mz_timely_util::operator::StreamExt as _; @@ -366,10 +367,10 @@ where tokio::pin!(source_stream); tokio::pin!(resume_uppers); - health_output.activate().session(&health_cap).give((worker_id, HealthStatusUpdate { + health_output.give(&health_cap, (worker_id, HealthStatusUpdate { update: HealthStatus::Starting, should_halt: false, - })); + })).await; let mut prev_status = HealthStatusUpdate { update: HealthStatus::Starting, should_halt: false, @@ -412,7 +413,6 @@ where // We want to efficiently batch up messages that are ready. To do that we will // activate the output handle here and then drain the currently available // messages until we either run out of messages or run out of time. - let mut output = output.activate(); while timer.elapsed() < YIELD_INTERVAL { match source_stream.next().now_or_never() { Some(Some(SourceMessageType::Message(message, cap, diff))) => { @@ -430,10 +430,7 @@ where if prev_status != new_status_update { prev_status = new_status_update.clone(); - health_output - .activate() - .session(&health_cap) - .give((worker_id, new_status_update)); + health_output.give(&health_cap, (worker_id, new_status_update)).await; } if let Ok(message) = &message { @@ -448,7 +445,7 @@ where // If cap is not beyond emit_cap we can't re-use emit_cap so // flush the current batch Some(emit_cap) => if !PartialOrder::less_equal(emit_cap, &cap) { - output.session(&emit_cap).give_container(&mut batch); + output.give_container(&*emit_cap, &mut batch).await; batch.clear(); *emit_cap = cap; }, @@ -458,12 +455,12 @@ where } Some(Some(SourceMessageType::SourceStatus(new_status))) => { prev_status = new_status.clone(); - health_output.activate().session(&health_cap).give((worker_id, new_status)); + health_output.give(&health_cap, (worker_id, new_status)).await; } Some(None) => { trace!("timely-{worker_id} source({id}): source ended, dropping capabilities"); if let Some(emit_cap) = emit_cap.take() { - output.session(&emit_cap).give_container(&mut batch); + output.give_container(&emit_cap, &mut batch).await; batch.clear(); } return; @@ -472,14 +469,10 @@ where } } if let Some(emit_cap) = emit_cap.take() { - output.session(&emit_cap).give_container(&mut batch); + output.give_container(&emit_cap, &mut batch).await; batch.clear(); } assert!(batch.is_empty()); - // Now we drop the activated output handle to force timely to emit any pending - // batch. It's crucial that this happens before our attempt to yield otherwise - // the buffer would get stuck in this operator. - drop(output); if timer.elapsed() > YIELD_INTERVAL { tokio::task::yield_now().await; } @@ -751,15 +744,10 @@ where &initial_batch.updates ); - // Out of an abundance of caution, do not hold the output handle - // across an await, and drop it before we downgrade the capability. - { - let mut remap_output = remap_output.activate(); - let cap = cap_set.delayed(cap_set.first().unwrap()); - let mut session = remap_output.session(&cap); - session.give_vec(&mut initial_batch.updates); - cap_set.downgrade(initial_batch.upper); - } + let cap = cap_set.delayed(cap_set.first().unwrap()); + remap_output.give_container(&cap, &mut initial_batch.updates).await; + drop(cap); + cap_set.downgrade(initial_batch.upper); let mut ticker = tokio::time::interval(timestamp_interval); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -780,14 +768,8 @@ where remap_trace_batch.upper.pretty() ); - // Out of an abundance of caution, do not hold the output handle - // across an await, and drop it before we downgrade the capability. - { - let mut remap_output = remap_output.activate(); - let cap = cap_set.delayed(cap_set.first().unwrap()); - let mut session = remap_output.session(&cap); - session.give_vec(&mut remap_trace_batch.updates); - } + let cap = cap_set.delayed(cap_set.first().unwrap()); + remap_output.give_container(&cap, &mut remap_trace_batch.updates).await; // If the last remap trace closed the input, we no longer // need to (or can) advance the timestamper. @@ -799,14 +781,8 @@ where let mut remap_trace_batch = timestamper.advance().await; - // Out of an abundance of caution, do not hold the output handle - // across an await, and drop it before we downgrade the capability. - { - let mut remap_output = remap_output.activate(); - let cap = cap_set.delayed(cap_set.first().unwrap()); - let mut session = remap_output.session(&cap); - session.give_vec(&mut remap_trace_batch.updates); - } + let cap = cap_set.delayed(cap_set.first().unwrap()); + remap_output.give_container(&cap, &mut remap_trace_batch.updates).await; cap_set.downgrade(remap_trace_batch.upper); } @@ -981,7 +957,6 @@ where // Accumulate updates to offsets for Prometheus and system table metrics collection let mut metric_updates = BTreeMap::new(); - let mut output = reclocked_output.activate(); let mut total_processed = 0; for ((message, from_ts, diff), into_ts) in timestamper.reclock(msgs) { let into_ts = into_ts.expect("reclock for update not beyond upper failed"); @@ -991,11 +966,11 @@ where diff, &mut bytes_read, &cap_set, - &mut output, + &mut reclocked_output, &mut metric_updates, into_ts, id, - ); + ).await; total_processed += 1; } // The loop above might have completely emptied batches. We can now remove them @@ -1140,15 +1115,15 @@ where /// /// TODO: This function is a bit of a mess rn but hopefully this function makes /// the existing mess more obvious and points towards ways to improve it. -fn handle_message( +async fn handle_message( message: Result, SourceReaderError>, time: T, diff: D, bytes_read: &mut usize, cap_set: &CapabilitySet, - output_handle: &mut OutputHandle< + output_handle: &mut AsyncOutputHandle< mz_repr::Timestamp, - (usize, Result, SourceError>), + Vec<(usize, Result, SourceError>)>, Tee, SourceError>)>, >, metric_updates: &mut BTreeMap, @@ -1199,7 +1174,7 @@ fn handle_message( }; let ts_cap = cap_set.delayed(&ts); - output_handle.session(&ts_cap).give(output); + output_handle.give(&ts_cap, output).await; match metric_updates.entry(partition) { Entry::Occupied(mut entry) => { entry.insert((offset, ts, entry.get().2 + 1)); diff --git a/src/timely-util/src/builder_async.rs b/src/timely-util/src/builder_async.rs index ebd04065a2ece..21d5d7de3e2d2 100644 --- a/src/timely-util/src/builder_async.rs +++ b/src/timely-util/src/builder_async.rs @@ -22,16 +22,20 @@ use differential_dataflow::operators::arrange::agent::ShutdownButton; use futures_util::task::ArcWake; use futures_util::FutureExt; use polonius_the_crab::{polonius, WithLifetime}; -use timely::communication::{message::RefOrMut, Pull}; +use timely::communication::{message::RefOrMut, Pull, Push}; use timely::dataflow::channels::pact::ParallelizationContractCore; +use timely::dataflow::channels::pushers::buffer::Session; +use timely::dataflow::channels::pushers::counter::CounterCore as PushCounter; use timely::dataflow::channels::pushers::TeeCore; use timely::dataflow::channels::BundleCore; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc; +use timely::dataflow::operators::generic::OutputHandleCore; use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputWrapper}; use timely::dataflow::operators::{Capability, InputCapability}; use timely::dataflow::{Scope, StreamCore}; use timely::progress::{Antichain, Timestamp}; use timely::scheduling::{Activator, SyncActivator}; +use timely::Data; use timely::{Container, PartialOrder}; /// Builds async operators with generic shape. @@ -46,9 +50,12 @@ pub struct OperatorBuilder { activator: Activator, /// The waker set up to activate this timely operator when woken operator_waker: Arc, - /// Holds type erased closures that should drain a handle when called. These handles will be + /// Holds type erased closures that drain an input handle when called. These handles will be /// automatically drained when the operator is scheduled and the logic future has exited drain_pipe: Rc>>>, + /// Holds type erased closures that flush an output handle when called. These handles will be + /// automatically drained when the operator is scheduled after the logic future has been polled + output_flushes: Vec>, } /// An async Waker that activates a specific operator when woken and marks the task as ready @@ -212,6 +219,115 @@ impl<'handle, T: Timestamp, D: Container, P: Pull>> Future } } +// TODO: delete and use CapabilityTrait instead once TimelyDataflow/timely-dataflow#512 gets merged +pub trait CapabilityTrait { + fn session<'a, D, P>( + &'a self, + handle: &'a mut OutputHandleCore<'_, T, D, P>, + ) -> Session<'a, T, D, PushCounter> + where + D: Container, + P: Push>; +} + +impl CapabilityTrait for InputCapability { + #[inline] + fn session<'a, D, P>( + &'a self, + handle: &'a mut OutputHandleCore<'_, T, D, P>, + ) -> Session<'a, T, D, PushCounter> + where + D: Container, + P: Push>, + { + handle.session(self) + } +} + +impl CapabilityTrait for Capability { + #[inline] + fn session<'a, D, P>( + &'a self, + handle: &'a mut OutputHandleCore<'_, T, D, P>, + ) -> Session<'a, T, D, PushCounter> + where + D: Container, + P: Push>, + { + handle.session(self) + } +} + +pub struct AsyncOutputHandle> + 'static> { + // The field order is important here as the handle is borrowing from the wrapper. See also the + // safety argument in the constructor + handle: Rc>>, + wrapper: Rc>>>, +} + +impl AsyncOutputHandle +where + T: Timestamp, + D: Container, + P: Push> + 'static, +{ + fn new(wrapper: OutputWrapper) -> Self { + let mut wrapper = Box::pin(wrapper); + // SAFETY: + // get_unchecked_mut is safe because we are not moving the wrapper + // + // transmute is safe because: + // * We're erasing the lifetime but we guarantee through field order that the handle will + // be dropped before the wrapper, thus manually enforcing the lifetime. + // * We never touch wrapper again after this point + let handle = unsafe { + let handle = wrapper.as_mut().get_unchecked_mut().activate(); + std::mem::transmute::, OutputHandleCore<'static, T, D, P>>( + handle, + ) + }; + Self { + handle: Rc::new(RefCell::new(handle)), + wrapper: Rc::new(wrapper), + } + } + + #[allow(clippy::unused_async)] + #[inline] + pub async fn give_container>(&mut self, cap: &C, container: &mut D) { + let mut handle = self.handle.borrow_mut(); + cap.session(&mut handle).give_container(container); + } + + fn cease(&mut self) { + self.handle.borrow_mut().cease() + } +} + +impl<'a, T, D, P> AsyncOutputHandle, P> +where + T: Timestamp, + D: Data, + P: Push>> + 'static, +{ + #[allow(clippy::unused_async)] + pub async fn give>(&mut self, cap: &C, data: D) { + let mut handle = self.handle.borrow_mut(); + cap.session(&mut handle).give(data); + } +} + +impl> + 'static> Clone + for AsyncOutputHandle +{ + fn clone(&self) -> Self { + Self { + handle: Rc::clone(&self.handle), + wrapper: Rc::clone(&self.wrapper), + } + } +} + impl OperatorBuilder { /// Allocates a new generic async operator builder from its containing scope. pub fn new(name: String, scope: G) -> Self { @@ -232,6 +348,7 @@ impl OperatorBuilder { activator, operator_waker: Arc::new(operator_waker), drain_pipe: Default::default(), + output_flushes: Default::default(), } } @@ -291,10 +408,12 @@ impl OperatorBuilder { pub fn new_output( &mut self, ) -> ( - OutputWrapper>, + AsyncOutputHandle>, StreamCore, ) { - self.builder.new_output() + let connection = + vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()]; + self.new_output_connection(connection) } /// Adds a new output with connetion information, returning the output handle and stream. @@ -311,10 +430,18 @@ impl OperatorBuilder { &mut self, connection: Vec::Summary>>, ) -> ( - OutputWrapper>, + AsyncOutputHandle>, StreamCore, ) { - self.builder.new_output_connection(connection) + let (wrapper, stream) = self.builder.new_output_connection(connection); + + let handle = AsyncOutputHandle::new(wrapper); + + let mut flush_handle = handle.clone(); + self.output_flushes + .push(Box::new(move || flush_handle.cease())); + + (handle, stream) } /// Creates an operator implementation from supplied logic constructor. It returns a shutdown @@ -330,6 +457,7 @@ impl OperatorBuilder { let registered_wakers = self.registered_wakers; let shared_frontiers = self.shared_frontiers; let drain_pipe = self.drain_pipe; + let mut output_flushes = self.output_flushes; let token = Rc::new(RefCell::new(Some(()))); let button = ShutdownButton::new(Rc::clone(&token), self.activator); self.builder.build_reschedule(move |caps| { @@ -374,6 +502,10 @@ impl OperatorBuilder { // We're done with logic so deallocate the task logic_fut = None; } + // Flush all the outputs before exiting + for flush in output_flushes.iter_mut() { + (flush)(); + } } } // The timely operator needs to be kept alive if the task is pending @@ -429,9 +561,7 @@ mod test { let cap = cap.retain(); for item in data.iter().copied() { tokio::time::sleep(Duration::from_millis(10)).await; - let mut output_handle = output.activate(); - let mut session = output_handle.session(&cap); - session.give(item); + output.give(&cap, item).await; } } Event::Progress(_frontier) => {} diff --git a/src/timely-util/src/operator.rs b/src/timely-util/src/operator.rs index bf50b6fd563b9..6c699a6b1d3fc 100644 --- a/src/timely-util/src/operator.rs +++ b/src/timely-util/src/operator.rs @@ -19,14 +19,16 @@ use timely::dataflow::channels::pushers::Tee; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc; use timely::dataflow::operators::generic::{ operator::{self, Operator}, - InputHandle, OperatorInfo, OutputHandle, OutputWrapper, + InputHandle, OperatorInfo, OutputHandle, }; use timely::dataflow::operators::Capability; use timely::dataflow::{Scope, Stream}; use timely::Data; use crate::buffer::ConsolidateBuffer; -use crate::builder_async::{AsyncInputHandle, OperatorBuilder as OperatorBuilderAsync}; +use crate::builder_async::{ + AsyncInputHandle, AsyncOutputHandle, OperatorBuilder as OperatorBuilderAsync, +}; /// Extension methods for timely [`Stream`]s. pub trait StreamExt @@ -75,7 +77,7 @@ where Capability, OperatorInfo, AsyncInputHandle, P::Puller>, - OutputWrapper, Tee>, + AsyncOutputHandle, Tee>, ) -> BFut, BFut: Future + 'static, P: ParallelizationContract; @@ -99,7 +101,7 @@ where OperatorInfo, AsyncInputHandle, P1::Puller>, AsyncInputHandle, P2::Puller>, - OutputWrapper, Tee>, + AsyncOutputHandle, Tee>, ) -> BFut, BFut: Future + 'static, P1: ParallelizationContract, @@ -253,7 +255,7 @@ where Capability, OperatorInfo, AsyncInputHandle, P::Puller>, - OutputWrapper, Tee>, + AsyncOutputHandle, Tee>, ) -> BFut, BFut: Future + 'static, P: ParallelizationContract, @@ -289,7 +291,7 @@ where OperatorInfo, AsyncInputHandle, P1::Puller>, AsyncInputHandle, P2::Puller>, - OutputWrapper, Tee>, + AsyncOutputHandle, Tee>, ) -> BFut, BFut: Future + 'static, P1: ParallelizationContract, @@ -451,7 +453,7 @@ where B: FnOnce( Capability, OperatorInfo, - OutputWrapper, Tee>, + AsyncOutputHandle, Tee>, ) -> BFut, BFut: Future + 'static, {