diff --git a/src/compute/src/sink/persist_sink.rs b/src/compute/src/sink/persist_sink.rs index a806558248d63..8b3d6fc3da561 100644 --- a/src/compute/src/sink/persist_sink.rs +++ b/src/compute/src/sink/persist_sink.rs @@ -460,9 +460,7 @@ where batch_description ); - let mut output = output.activate(); - let mut session = output.session(&cap); - session.give(batch_description); + output.give(&cap, batch_description).await; // WIP: We downgrade our capability so that downstream // operators (writer and appender) can know when all the @@ -823,9 +821,7 @@ where } }; - let mut output = output.activate(); - let mut session = output.session(&cap); - session.give(batch_or_data); + output.give(&cap, batch_or_data).await; } } } else { diff --git a/src/persist-client/src/operators/shard_source.rs b/src/persist-client/src/operators/shard_source.rs index da7875755d1b1..06f92890039bb 100644 --- a/src/persist-client/src/operators/shard_source.rs +++ b/src/persist-client/src/operators/shard_source.rs @@ -309,8 +309,6 @@ where } Some(Ok(ListenEvent::Progress(progress))) => { let session_cap = cap_set.delayed(¤t_ts); - let mut descs_output = descs_output.activate(); - let mut descs_session = descs_output.session(&session_cap); // NB: in order to play nice with downstream operators whose invariants // depend on seeing the full contents of an individual batch, we must @@ -328,7 +326,7 @@ where // doing instead here, but this has seemed to work // okay so far. Continue to revisit as necessary. let worker_idx = usize::cast_from(Instant::now().hashed()) % num_workers; - descs_session.give((worker_idx, part_desc.into_exchangeable_part())); + descs_output.give(&session_cap, (worker_idx, part_desc.into_exchangeable_part())).await; } bytes_emitted }; @@ -480,12 +478,10 @@ where // outputs or sessions across await points, which // would prevent messages from being flushed from // the shared timely output buffer. - let mut fetched_output = fetched_output.activate(); - let mut tokens_output = tokens_output.activate(); - fetched_output.session(&cap).give(fetched); + fetched_output.give(&cap, fetched).await; tokens_output - .session(&cap) - .give(token.into_exchangeable_part()); + .give(&cap, token.into_exchangeable_part()) + .await; } } } diff --git a/src/storage/src/decode/mod.rs b/src/storage/src/decode/mod.rs index 4a43e4a5a6a2c..bc06d83f32457 100644 --- a/src/storage/src/decode/mod.rs +++ b/src/storage/src/decode/mod.rs @@ -582,9 +582,8 @@ where headers.as_deref(), ); - let mut output = output.activate(); if value_bytes_remaining.is_empty() { - output.session(&cap).give(DecodeResult { + let result = DecodeResult { key: None, value: Some(value.map(|r| (r, 1)).map_err(|inner| { DecodeError { @@ -596,11 +595,12 @@ where upstream_time_millis, partition: partition.clone(), metadata, - }); + }; + output.give(&cap, result).await; value_buf = vec![]; break; } else { - output.session(&cap).give(DecodeResult { + let result = DecodeResult { key: None, value: Some(value.map(|r| (r, 1)).map_err(|inner| { DecodeError { @@ -612,7 +612,8 @@ where upstream_time_millis, partition: partition.clone(), metadata, - }); + }; + output.give(&cap, result).await; } if is_err { // If decoding has gone off the rails, we can no longer be sure that the delimiters are correct, so it @@ -650,8 +651,7 @@ where headers.as_deref(), ); - let mut output = output.activate(); - output.session(&cap).give(DecodeResult { + let result = DecodeResult { key: None, value: Some(value.map(|r| (r, 1)).map_err(|inner| DecodeError { kind: inner, @@ -661,7 +661,8 @@ where upstream_time_millis, partition: partition.clone(), metadata, - }); + }; + output.give(&cap, result).await; } } } diff --git a/src/storage/src/source/source_reader_pipeline.rs b/src/storage/src/source/source_reader_pipeline.rs index 52f74bb7d5495..6c3770dfe0c4d 100644 --- a/src/storage/src/source/source_reader_pipeline.rs +++ b/src/storage/src/source/source_reader_pipeline.rs @@ -44,7 +44,6 @@ use timely::dataflow::channels::pact::{Exchange, Pipeline}; use timely::dataflow::channels::pushers::Tee; use timely::dataflow::operators::capture::capture::Capture; use timely::dataflow::operators::capture::Event; -use timely::dataflow::operators::generic::OutputHandle; use timely::dataflow::operators::{Broadcast, CapabilitySet, Partition}; use timely::dataflow::{Scope, Stream}; use timely::progress::frontier::MutableAntichain; @@ -68,7 +67,9 @@ use mz_storage_client::types::errors::SourceError; use mz_storage_client::types::sources::encoding::SourceDataEncoding; use mz_storage_client::types::sources::{MzOffset, SourceConnection, SourceTimestamp, SourceToken}; use mz_timely_util::antichain::AntichainExt; -use mz_timely_util::builder_async::{Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder}; +use mz_timely_util::builder_async::{ + AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, +}; use mz_timely_util::capture::UnboundedTokioCapture; use mz_timely_util::operator::StreamExt as _; @@ -366,10 +367,10 @@ where tokio::pin!(source_stream); tokio::pin!(resume_uppers); - health_output.activate().session(&health_cap).give((worker_id, HealthStatusUpdate { + health_output.give(&health_cap, (worker_id, HealthStatusUpdate { update: HealthStatus::Starting, should_halt: false, - })); + })).await; let mut prev_status = HealthStatusUpdate { update: HealthStatus::Starting, should_halt: false, @@ -412,7 +413,6 @@ where // We want to efficiently batch up messages that are ready. To do that we will // activate the output handle here and then drain the currently available // messages until we either run out of messages or run out of time. - let mut output = output.activate(); while timer.elapsed() < YIELD_INTERVAL { match source_stream.next().now_or_never() { Some(Some(SourceMessageType::Message(message, cap, diff))) => { @@ -430,10 +430,7 @@ where if prev_status != new_status_update { prev_status = new_status_update.clone(); - health_output - .activate() - .session(&health_cap) - .give((worker_id, new_status_update)); + health_output.give(&health_cap, (worker_id, new_status_update)).await; } if let Ok(message) = &message { @@ -448,7 +445,7 @@ where // If cap is not beyond emit_cap we can't re-use emit_cap so // flush the current batch Some(emit_cap) => if !PartialOrder::less_equal(emit_cap, &cap) { - output.session(&emit_cap).give_container(&mut batch); + output.give_container(&*emit_cap, &mut batch).await; batch.clear(); *emit_cap = cap; }, @@ -458,12 +455,12 @@ where } Some(Some(SourceMessageType::SourceStatus(new_status))) => { prev_status = new_status.clone(); - health_output.activate().session(&health_cap).give((worker_id, new_status)); + health_output.give(&health_cap, (worker_id, new_status)).await; } Some(None) => { trace!("timely-{worker_id} source({id}): source ended, dropping capabilities"); if let Some(emit_cap) = emit_cap.take() { - output.session(&emit_cap).give_container(&mut batch); + output.give_container(&emit_cap, &mut batch).await; batch.clear(); } return; @@ -472,14 +469,10 @@ where } } if let Some(emit_cap) = emit_cap.take() { - output.session(&emit_cap).give_container(&mut batch); + output.give_container(&emit_cap, &mut batch).await; batch.clear(); } assert!(batch.is_empty()); - // Now we drop the activated output handle to force timely to emit any pending - // batch. It's crucial that this happens before our attempt to yield otherwise - // the buffer would get stuck in this operator. - drop(output); if timer.elapsed() > YIELD_INTERVAL { tokio::task::yield_now().await; } @@ -751,15 +744,10 @@ where &initial_batch.updates ); - // Out of an abundance of caution, do not hold the output handle - // across an await, and drop it before we downgrade the capability. - { - let mut remap_output = remap_output.activate(); - let cap = cap_set.delayed(cap_set.first().unwrap()); - let mut session = remap_output.session(&cap); - session.give_vec(&mut initial_batch.updates); - cap_set.downgrade(initial_batch.upper); - } + let cap = cap_set.delayed(cap_set.first().unwrap()); + remap_output.give_container(&cap, &mut initial_batch.updates).await; + drop(cap); + cap_set.downgrade(initial_batch.upper); let mut ticker = tokio::time::interval(timestamp_interval); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -780,14 +768,8 @@ where remap_trace_batch.upper.pretty() ); - // Out of an abundance of caution, do not hold the output handle - // across an await, and drop it before we downgrade the capability. - { - let mut remap_output = remap_output.activate(); - let cap = cap_set.delayed(cap_set.first().unwrap()); - let mut session = remap_output.session(&cap); - session.give_vec(&mut remap_trace_batch.updates); - } + let cap = cap_set.delayed(cap_set.first().unwrap()); + remap_output.give_container(&cap, &mut remap_trace_batch.updates).await; // If the last remap trace closed the input, we no longer // need to (or can) advance the timestamper. @@ -799,14 +781,8 @@ where let mut remap_trace_batch = timestamper.advance().await; - // Out of an abundance of caution, do not hold the output handle - // across an await, and drop it before we downgrade the capability. - { - let mut remap_output = remap_output.activate(); - let cap = cap_set.delayed(cap_set.first().unwrap()); - let mut session = remap_output.session(&cap); - session.give_vec(&mut remap_trace_batch.updates); - } + let cap = cap_set.delayed(cap_set.first().unwrap()); + remap_output.give_container(&cap, &mut remap_trace_batch.updates).await; cap_set.downgrade(remap_trace_batch.upper); } @@ -981,7 +957,6 @@ where // Accumulate updates to offsets for Prometheus and system table metrics collection let mut metric_updates = BTreeMap::new(); - let mut output = reclocked_output.activate(); let mut total_processed = 0; for ((message, from_ts, diff), into_ts) in timestamper.reclock(msgs) { let into_ts = into_ts.expect("reclock for update not beyond upper failed"); @@ -991,11 +966,11 @@ where diff, &mut bytes_read, &cap_set, - &mut output, + &mut reclocked_output, &mut metric_updates, into_ts, id, - ); + ).await; total_processed += 1; } // The loop above might have completely emptied batches. We can now remove them @@ -1140,15 +1115,15 @@ where /// /// TODO: This function is a bit of a mess rn but hopefully this function makes /// the existing mess more obvious and points towards ways to improve it. -fn handle_message( +async fn handle_message( message: Result, SourceReaderError>, time: T, diff: D, bytes_read: &mut usize, cap_set: &CapabilitySet, - output_handle: &mut OutputHandle< + output_handle: &mut AsyncOutputHandle< mz_repr::Timestamp, - (usize, Result, SourceError>), + Vec<(usize, Result, SourceError>)>, Tee, SourceError>)>, >, metric_updates: &mut BTreeMap, @@ -1199,7 +1174,7 @@ fn handle_message( }; let ts_cap = cap_set.delayed(&ts); - output_handle.session(&ts_cap).give(output); + output_handle.give(&ts_cap, output).await; match metric_updates.entry(partition) { Entry::Occupied(mut entry) => { entry.insert((offset, ts, entry.get().2 + 1)); diff --git a/src/timely-util/src/builder_async.rs b/src/timely-util/src/builder_async.rs index ebd04065a2ece..21d5d7de3e2d2 100644 --- a/src/timely-util/src/builder_async.rs +++ b/src/timely-util/src/builder_async.rs @@ -22,16 +22,20 @@ use differential_dataflow::operators::arrange::agent::ShutdownButton; use futures_util::task::ArcWake; use futures_util::FutureExt; use polonius_the_crab::{polonius, WithLifetime}; -use timely::communication::{message::RefOrMut, Pull}; +use timely::communication::{message::RefOrMut, Pull, Push}; use timely::dataflow::channels::pact::ParallelizationContractCore; +use timely::dataflow::channels::pushers::buffer::Session; +use timely::dataflow::channels::pushers::counter::CounterCore as PushCounter; use timely::dataflow::channels::pushers::TeeCore; use timely::dataflow::channels::BundleCore; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc; +use timely::dataflow::operators::generic::OutputHandleCore; use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputWrapper}; use timely::dataflow::operators::{Capability, InputCapability}; use timely::dataflow::{Scope, StreamCore}; use timely::progress::{Antichain, Timestamp}; use timely::scheduling::{Activator, SyncActivator}; +use timely::Data; use timely::{Container, PartialOrder}; /// Builds async operators with generic shape. @@ -46,9 +50,12 @@ pub struct OperatorBuilder { activator: Activator, /// The waker set up to activate this timely operator when woken operator_waker: Arc, - /// Holds type erased closures that should drain a handle when called. These handles will be + /// Holds type erased closures that drain an input handle when called. These handles will be /// automatically drained when the operator is scheduled and the logic future has exited drain_pipe: Rc>>>, + /// Holds type erased closures that flush an output handle when called. These handles will be + /// automatically drained when the operator is scheduled after the logic future has been polled + output_flushes: Vec>, } /// An async Waker that activates a specific operator when woken and marks the task as ready @@ -212,6 +219,115 @@ impl<'handle, T: Timestamp, D: Container, P: Pull>> Future } } +// TODO: delete and use CapabilityTrait instead once TimelyDataflow/timely-dataflow#512 gets merged +pub trait CapabilityTrait { + fn session<'a, D, P>( + &'a self, + handle: &'a mut OutputHandleCore<'_, T, D, P>, + ) -> Session<'a, T, D, PushCounter> + where + D: Container, + P: Push>; +} + +impl CapabilityTrait for InputCapability { + #[inline] + fn session<'a, D, P>( + &'a self, + handle: &'a mut OutputHandleCore<'_, T, D, P>, + ) -> Session<'a, T, D, PushCounter> + where + D: Container, + P: Push>, + { + handle.session(self) + } +} + +impl CapabilityTrait for Capability { + #[inline] + fn session<'a, D, P>( + &'a self, + handle: &'a mut OutputHandleCore<'_, T, D, P>, + ) -> Session<'a, T, D, PushCounter> + where + D: Container, + P: Push>, + { + handle.session(self) + } +} + +pub struct AsyncOutputHandle> + 'static> { + // The field order is important here as the handle is borrowing from the wrapper. See also the + // safety argument in the constructor + handle: Rc>>, + wrapper: Rc>>>, +} + +impl AsyncOutputHandle +where + T: Timestamp, + D: Container, + P: Push> + 'static, +{ + fn new(wrapper: OutputWrapper) -> Self { + let mut wrapper = Box::pin(wrapper); + // SAFETY: + // get_unchecked_mut is safe because we are not moving the wrapper + // + // transmute is safe because: + // * We're erasing the lifetime but we guarantee through field order that the handle will + // be dropped before the wrapper, thus manually enforcing the lifetime. + // * We never touch wrapper again after this point + let handle = unsafe { + let handle = wrapper.as_mut().get_unchecked_mut().activate(); + std::mem::transmute::, OutputHandleCore<'static, T, D, P>>( + handle, + ) + }; + Self { + handle: Rc::new(RefCell::new(handle)), + wrapper: Rc::new(wrapper), + } + } + + #[allow(clippy::unused_async)] + #[inline] + pub async fn give_container>(&mut self, cap: &C, container: &mut D) { + let mut handle = self.handle.borrow_mut(); + cap.session(&mut handle).give_container(container); + } + + fn cease(&mut self) { + self.handle.borrow_mut().cease() + } +} + +impl<'a, T, D, P> AsyncOutputHandle, P> +where + T: Timestamp, + D: Data, + P: Push>> + 'static, +{ + #[allow(clippy::unused_async)] + pub async fn give>(&mut self, cap: &C, data: D) { + let mut handle = self.handle.borrow_mut(); + cap.session(&mut handle).give(data); + } +} + +impl> + 'static> Clone + for AsyncOutputHandle +{ + fn clone(&self) -> Self { + Self { + handle: Rc::clone(&self.handle), + wrapper: Rc::clone(&self.wrapper), + } + } +} + impl OperatorBuilder { /// Allocates a new generic async operator builder from its containing scope. pub fn new(name: String, scope: G) -> Self { @@ -232,6 +348,7 @@ impl OperatorBuilder { activator, operator_waker: Arc::new(operator_waker), drain_pipe: Default::default(), + output_flushes: Default::default(), } } @@ -291,10 +408,12 @@ impl OperatorBuilder { pub fn new_output( &mut self, ) -> ( - OutputWrapper>, + AsyncOutputHandle>, StreamCore, ) { - self.builder.new_output() + let connection = + vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()]; + self.new_output_connection(connection) } /// Adds a new output with connetion information, returning the output handle and stream. @@ -311,10 +430,18 @@ impl OperatorBuilder { &mut self, connection: Vec::Summary>>, ) -> ( - OutputWrapper>, + AsyncOutputHandle>, StreamCore, ) { - self.builder.new_output_connection(connection) + let (wrapper, stream) = self.builder.new_output_connection(connection); + + let handle = AsyncOutputHandle::new(wrapper); + + let mut flush_handle = handle.clone(); + self.output_flushes + .push(Box::new(move || flush_handle.cease())); + + (handle, stream) } /// Creates an operator implementation from supplied logic constructor. It returns a shutdown @@ -330,6 +457,7 @@ impl OperatorBuilder { let registered_wakers = self.registered_wakers; let shared_frontiers = self.shared_frontiers; let drain_pipe = self.drain_pipe; + let mut output_flushes = self.output_flushes; let token = Rc::new(RefCell::new(Some(()))); let button = ShutdownButton::new(Rc::clone(&token), self.activator); self.builder.build_reschedule(move |caps| { @@ -374,6 +502,10 @@ impl OperatorBuilder { // We're done with logic so deallocate the task logic_fut = None; } + // Flush all the outputs before exiting + for flush in output_flushes.iter_mut() { + (flush)(); + } } } // The timely operator needs to be kept alive if the task is pending @@ -429,9 +561,7 @@ mod test { let cap = cap.retain(); for item in data.iter().copied() { tokio::time::sleep(Duration::from_millis(10)).await; - let mut output_handle = output.activate(); - let mut session = output_handle.session(&cap); - session.give(item); + output.give(&cap, item).await; } } Event::Progress(_frontier) => {} diff --git a/src/timely-util/src/operator.rs b/src/timely-util/src/operator.rs index bf50b6fd563b9..6c699a6b1d3fc 100644 --- a/src/timely-util/src/operator.rs +++ b/src/timely-util/src/operator.rs @@ -19,14 +19,16 @@ use timely::dataflow::channels::pushers::Tee; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc; use timely::dataflow::operators::generic::{ operator::{self, Operator}, - InputHandle, OperatorInfo, OutputHandle, OutputWrapper, + InputHandle, OperatorInfo, OutputHandle, }; use timely::dataflow::operators::Capability; use timely::dataflow::{Scope, Stream}; use timely::Data; use crate::buffer::ConsolidateBuffer; -use crate::builder_async::{AsyncInputHandle, OperatorBuilder as OperatorBuilderAsync}; +use crate::builder_async::{ + AsyncInputHandle, AsyncOutputHandle, OperatorBuilder as OperatorBuilderAsync, +}; /// Extension methods for timely [`Stream`]s. pub trait StreamExt @@ -75,7 +77,7 @@ where Capability, OperatorInfo, AsyncInputHandle, P::Puller>, - OutputWrapper, Tee>, + AsyncOutputHandle, Tee>, ) -> BFut, BFut: Future + 'static, P: ParallelizationContract; @@ -99,7 +101,7 @@ where OperatorInfo, AsyncInputHandle, P1::Puller>, AsyncInputHandle, P2::Puller>, - OutputWrapper, Tee>, + AsyncOutputHandle, Tee>, ) -> BFut, BFut: Future + 'static, P1: ParallelizationContract, @@ -253,7 +255,7 @@ where Capability, OperatorInfo, AsyncInputHandle, P::Puller>, - OutputWrapper, Tee>, + AsyncOutputHandle, Tee>, ) -> BFut, BFut: Future + 'static, P: ParallelizationContract, @@ -289,7 +291,7 @@ where OperatorInfo, AsyncInputHandle, P1::Puller>, AsyncInputHandle, P2::Puller>, - OutputWrapper, Tee>, + AsyncOutputHandle, Tee>, ) -> BFut, BFut: Future + 'static, P1: ParallelizationContract, @@ -451,7 +453,7 @@ where B: FnOnce( Capability, OperatorInfo, - OutputWrapper, Tee>, + AsyncOutputHandle, Tee>, ) -> BFut, BFut: Future + 'static, {