diff --git a/Cargo.lock b/Cargo.lock index b6aa3b27195ba..2e0292388b1b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7128,6 +7128,7 @@ dependencies = [ "proptest-derive", "prost", "prost-build", + "rand 0.9.2", "semver", "sentry-tracing", "serde", diff --git a/src/persist-client/Cargo.toml b/src/persist-client/Cargo.toml index 6fd845cce5da3..ba539b35fc383 100644 --- a/src/persist-client/Cargo.toml +++ b/src/persist-client/Cargo.toml @@ -74,6 +74,7 @@ criterion = { version = "0.8.1", features = ["html_reports"] } datadriven = { version = "0.9.0", features = ["async"] } futures-task = "0.3.31" num_cpus = "1.17.0" +rand = "0.9.2" tempfile = "3.23.0" [build-dependencies] diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 574ec8a7f83f0..35ef6800bc8c4 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -472,7 +472,6 @@ impl Lease { } /// Returns the inner [SeqNo] of this [Lease]. - #[cfg(test)] pub fn seqno(&self) -> SeqNo { *self.0 } diff --git a/src/persist-client/src/internal/datadriven.rs b/src/persist-client/src/internal/datadriven.rs index 99fccd6cbf7f8..37f40072cb973 100644 --- a/src/persist-client/src/internal/datadriven.rs +++ b/src/persist-client/src/internal/datadriven.rs @@ -245,9 +245,6 @@ mod tests { "fetch-batch" => machine_dd::fetch_batch(&state, args).await, "finalize" => machine_dd::finalize(&mut state, args).await, "gc" => machine_dd::gc(&mut state, args).await, - "heartbeat-leased-reader" => { - machine_dd::heartbeat_leased_reader(&state, args).await - } "is-finalized" => machine_dd::is_finalized(&state, args), "listen-through" => { machine_dd::listen_through(&mut state, args).await diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 33e5544ec2f39..6eea1e429b5bf 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -23,7 +23,6 @@ use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; #[allow(unused_imports)] // False positive. use mz_ore::fmt::FormatBuffer; -use mz_ore::task::JoinHandle; use mz_ore::{assert_none, soft_assert_no_log}; use mz_persist::location::{ExternalError, Indeterminate, SeqNo}; use mz_persist::retry::Retry; @@ -32,7 +31,7 @@ use mz_persist_types::{Codec, Codec64, Opaque}; use semver::Version; use timely::PartialOrder; use timely::progress::{Antichain, Timestamp}; -use tracing::{Instrument, debug, info, trace_span, warn}; +use tracing::{Instrument, debug, info, trace_span}; use crate::async_runtime::IsolatedRuntime; use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES; @@ -42,7 +41,6 @@ use crate::critical::CriticalReaderId; use crate::error::{CodecMismatch, InvalidUsage}; use crate::internal::apply::Applier; use crate::internal::compact::CompactReq; -use crate::internal::gc::GarbageCollector; use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance}; use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics}; use crate::internal::paths::PartialRollupKey; @@ -54,7 +52,7 @@ use crate::internal::state::{ use crate::internal::state_versions::StateVersions; use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; use crate::internal::watch::StateWatch; -use crate::read::{LeasedReaderId, READER_LEASE_DURATION}; +use crate::read::LeasedReaderId; use crate::rpc::PubSubSender; use crate::schema::CaESchema; use crate::write::WriterId; @@ -620,7 +618,7 @@ where pub async fn downgrade_since( &self, reader_id: &LeasedReaderId, - outstanding_seqno: Option, + outstanding_seqno: SeqNo, new_since: &Antichain, heartbeat_timestamp_ms: u64, ) -> (SeqNo, Since, RoutineMaintenance) { @@ -663,20 +661,6 @@ where } } - pub async fn heartbeat_leased_reader( - &self, - reader_id: &LeasedReaderId, - heartbeat_timestamp_ms: u64, - ) -> (SeqNo, bool, RoutineMaintenance) { - let metrics = Arc::clone(&self.applier.metrics); - let (seqno, existed, maintenance) = self - .apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| { - state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms) - }) - .await; - (seqno, existed, maintenance) - } - pub async fn expire_leased_reader( &self, reader_id: &LeasedReaderId, @@ -1195,119 +1179,6 @@ impl CompareAndAppendRes { } } -impl Machine -where - K: Debug + Codec, - V: Debug + Codec, - T: Timestamp + Lattice + Codec64 + Sync, - D: Monoid + Codec64 + Send + Sync, -{ - #[allow(clippy::unused_async)] - pub async fn start_reader_heartbeat_tasks( - self, - reader_id: LeasedReaderId, - gc: GarbageCollector, - ) -> Vec> { - let mut ret = Vec::new(); - let metrics = Arc::clone(&self.applier.metrics); - - // TODO: In response to a production incident, this runs the heartbeat - // task on both the in-context tokio runtime and persist's isolated - // runtime. We think we were seeing tasks (including this one) get stuck - // indefinitely in tokio while waiting for a runtime worker. This could - // happen if some other task in that runtime never yields. It's possible - // that one of the two runtimes is healthy while the other isn't (this - // was inconclusive in the incident debugging), and the heartbeat task - // is fairly lightweight, so run a copy in each in case that helps. - // - // The real fix here is to find the misbehaving task and fix it. Remove - // this duplication when that happens. - let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id); - ret.push(mz_ore::task::spawn(|| name, { - let machine = self.clone(); - let reader_id = reader_id.clone(); - let gc = gc.clone(); - metrics - .tasks - .heartbeat_read - .instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc)) - })); - - let isolated_runtime = Arc::clone(&self.isolated_runtime); - let name = format!( - "persist::heartbeat_read_isolated({},{})", - self.shard_id(), - reader_id - ); - ret.push( - isolated_runtime.spawn_named( - || name, - metrics - .tasks - .heartbeat_read - .instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)), - ), - ); - - ret - } - - async fn reader_heartbeat_task( - machine: Self, - reader_id: LeasedReaderId, - gc: GarbageCollector, - ) { - let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2; - loop { - let before_sleep = Instant::now(); - tokio::time::sleep(sleep_duration).await; - - let elapsed_since_before_sleeping = before_sleep.elapsed(); - if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) { - warn!( - "reader ({}) of shard ({}) went {}s between heartbeats", - reader_id, - machine.shard_id(), - elapsed_since_before_sleeping.as_secs_f64() - ); - } - - let before_heartbeat = Instant::now(); - let (_seqno, existed, maintenance) = machine - .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)()) - .await; - maintenance.start_performing(&machine, &gc); - - let elapsed_since_heartbeat = before_heartbeat.elapsed(); - if elapsed_since_heartbeat > Duration::from_secs(60) { - warn!( - "reader ({}) of shard ({}) heartbeat call took {}s", - reader_id, - machine.shard_id(), - elapsed_since_heartbeat.as_secs_f64(), - ); - } - - if !existed { - // If the read handle was intentionally expired, this task - // *should* be aborted before it observes the expiration. So if - // we get here, this task somehow failed to keep the read lease - // alive. Warn loudly, because there's now a live read handle to - // an expired shard that will panic if used, but don't panic, - // just in case there is some edge case that results in this - // task observing the intentional expiration of a read handle. - warn!( - "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \ - while read handle is live", - reader_id, - machine.shard_id(), - ); - return; - } - } - } -} - pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config = Config::new( "persist_next_listen_batch_retryer_fixed_sleep", Duration::from_millis(1200), // pubsub is on by default! @@ -1648,7 +1519,9 @@ pub mod datadriven { args: DirectiveArgs<'_>, ) -> Result { let since = args.expect_antichain("since"); - let seqno = args.optional("seqno"); + let seqno = args + .optional("seqno") + .unwrap_or_else(|| datadriven.machine.seqno()); let reader_id = args.expect("reader_id"); let (_, since, routine) = datadriven .machine @@ -2351,18 +2224,6 @@ pub mod datadriven { )) } - pub async fn heartbeat_leased_reader( - datadriven: &MachineState, - args: DirectiveArgs<'_>, - ) -> Result { - let reader_id = args.expect("reader_id"); - let _ = datadriven - .machine - .heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)()) - .await; - Ok(format!("{} ok\n", datadriven.machine.seqno())) - } - pub async fn expire_critical_reader( datadriven: &mut MachineState, args: DirectiveArgs<'_>, @@ -2619,10 +2480,13 @@ pub mod tests { let client = new_test_client(&dyncfgs).await; // set a low rollup threshold so GC/truncation is more aggressive client.cfg.set_config(&ROLLUP_THRESHOLD, 5); - let (mut write, _) = client + let (mut write, read) = client .expect_open::(ShardId::new()) .await; + // Ensure the reader is not holding back the since. + read.expire().await; + // Write a bunch of batches. This should result in a bounded number of // live entries in consensus. const NUM_BATCHES: u64 = 100; diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index fc8f570b81c58..3a9500eae508d 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -421,7 +421,6 @@ impl MetricsVecs { )), compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"), downgrade_since: self.cmd_metrics("downgrade_since"), - heartbeat_reader: self.cmd_metrics("heartbeat_reader"), expire_reader: self.cmd_metrics("expire_reader"), expire_writer: self.cmd_metrics("expire_writer"), merge_res: self.cmd_metrics("merge_res"), @@ -630,7 +629,6 @@ pub struct CmdsMetrics { pub(crate) compare_and_append_noop: IntCounter, pub(crate) compare_and_downgrade_since: CmdMetrics, pub(crate) downgrade_since: CmdMetrics, - pub(crate) heartbeat_reader: CmdMetrics, pub(crate) expire_reader: CmdMetrics, pub(crate) expire_writer: CmdMetrics, pub(crate) merge_res: CmdMetrics, diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index 193982966d80a..1105ceec58e75 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -1877,7 +1877,7 @@ where &mut self, reader_id: &LeasedReaderId, seqno: SeqNo, - outstanding_seqno: Option, + outstanding_seqno: SeqNo, new_since: &Antichain, heartbeat_timestamp_ms: u64, ) -> ControlFlow>, Since> { @@ -1905,18 +1905,15 @@ where reader_state.last_heartbeat_timestamp_ms, ); - let seqno = match outstanding_seqno { - Some(outstanding_seqno) => { - assert!( - outstanding_seqno >= reader_state.seqno, - "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \ + let seqno = { + assert!( + outstanding_seqno >= reader_state.seqno, + "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \ is behind current reader_state ({:?})", - outstanding_seqno, - reader_state.seqno, - ); - std::cmp::min(outstanding_seqno, seqno) - } - None => seqno, + outstanding_seqno, + reader_state.seqno, + ); + std::cmp::min(outstanding_seqno, seqno) }; reader_state.seqno = seqno; @@ -1979,33 +1976,6 @@ where } } - pub fn heartbeat_leased_reader( - &mut self, - reader_id: &LeasedReaderId, - heartbeat_timestamp_ms: u64, - ) -> ControlFlow, bool> { - // We expire all readers if the upper and since both advance to the - // empty antichain. Gracefully handle this. At the same time, - // short-circuit the cmd application so we don't needlessly create new - // SeqNos. - if self.is_tombstone() { - return Break(NoOpStateTransition(false)); - } - - match self.leased_readers.get_mut(reader_id) { - Some(reader_state) => { - reader_state.last_heartbeat_timestamp_ms = std::cmp::max( - heartbeat_timestamp_ms, - reader_state.last_heartbeat_timestamp_ms, - ); - Continue(true) - } - // No-op, but we still commit the state change so that this gets - // linearized (maybe we're looking at old state). - None => Continue(false), - } - } - pub fn expire_leased_reader( &mut self, reader_id: &LeasedReaderId, @@ -3231,7 +3201,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(2), now() ), @@ -3243,7 +3213,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(2), now() ), @@ -3255,7 +3225,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(1), now() ), @@ -3280,7 +3250,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader2, seqno, - None, + seqno, &Antichain::from_elem(3), now() ), @@ -3292,7 +3262,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(5), now() ), @@ -3324,7 +3294,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader3, seqno, - None, + seqno, &Antichain::from_elem(10), now() ), @@ -3624,7 +3594,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, SeqNo::minimum(), - None, + SeqNo::minimum(), &Antichain::from_elem(2), now() ), diff --git a/src/persist-client/src/internal/watch.rs b/src/persist-client/src/internal/watch.rs index a1a628a1855bf..a5ca6a945555a 100644 --- a/src/persist-client/src/internal/watch.rs +++ b/src/persist-client/src/internal/watch.rs @@ -9,10 +9,10 @@ //! Notifications for state changes. -use std::sync::Arc; - use mz_persist::location::SeqNo; -use tokio::sync::broadcast; +use std::fmt::{Debug, Formatter}; +use std::sync::{Arc, RwLock}; +use tokio::sync::{Notify, broadcast}; use tracing::debug; use crate::cache::LockingTypedState; @@ -132,6 +132,83 @@ impl StateWatch { } } +/// A concurrent state - one which allows reading, writing, and waiting for changes made by +/// another concurrent writer. +/// +/// This is morally similar to a mutex with a condvar, but allowing asynchronous waits and with +/// access methods that make it a little trickier to accidentally hold a lock across a yield point. +pub(crate) struct AwaitableState { + state: Arc>, + /// NB: we can't wrap the [Notify] in the lock since the signature of [Notify::notified] + /// doesn't allow it, but this is only accessed while holding the lock. + notify: Arc, +} + +impl Debug for AwaitableState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.state.read().fmt(f) + } +} + +impl Clone for AwaitableState { + fn clone(&self) -> Self { + Self { + state: Arc::clone(&self.state), + notify: Arc::clone(&self.notify), + } + } +} + +impl AwaitableState { + pub fn new(value: T) -> Self { + Self { + state: Arc::new(RwLock::new(value)), + notify: Arc::new(Notify::new()), + } + } + + #[allow(dead_code)] + pub fn read(&self, read_fn: impl FnOnce(&T) -> A) -> A { + let guard = self.state.read().expect("not poisoned"); + let state = &*guard; + read_fn(state) + } + + pub fn modify(&self, write_fn: impl FnOnce(&mut T) -> A) -> A { + let mut guard = self.state.write().expect("not poisoned"); + let state = &mut *guard; + let result = write_fn(state); + // Notify everyone while holding the guard. This guarantees that all waiters will observe + // the just-updated state. + self.notify.notify_waiters(); + drop(guard); + result + } + + pub async fn wait_for(&self, mut wait_fn: impl FnMut(&T) -> Option) -> A { + loop { + let notified = { + let guard = self.state.read().expect("not poisoned"); + let state = &*guard; + if let Some(result) = wait_fn(state) { + return result; + } + // Grab the notified future while holding the guard. This ensures that we will see any + // future modifications to this state, even if they happen before the first poll. + let notified = self.notify.notified(); + drop(guard); + notified + }; + + notified.await; + } + } + + pub async fn wait_while(&self, mut wait_fn: impl FnMut(&T) -> bool) { + self.wait_for(|s| (!wait_fn(s)).then_some(())).await + } +} + #[cfg(test)] mod tests { use std::future::Future; @@ -141,12 +218,15 @@ mod tests { use futures::FutureExt; use futures_task::noop_waker; + use itertools::Itertools; use mz_build_info::DUMMY_BUILD_INFO; use mz_dyncfg::ConfigUpdates; use mz_ore::assert_none; use mz_ore::cast::CastFrom; use mz_ore::metrics::MetricsRegistry; + use rand::prelude::SliceRandom; use timely::progress::Antichain; + use tokio::task::JoinSet; use crate::cache::StateCache; use crate::cfg::PersistConfig; @@ -326,4 +406,29 @@ mod tests { // the polling on this. let _ = snapshot.await; } + + #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[allow(clippy::disallowed_methods)] // For JoinSet. + #[cfg_attr(miri, ignore)] + async fn wait_on_awaitable_state() { + const TASKS: usize = 1000; + // Launch a bunch of tasks, have them all wait for a specific number, then increment it + // by one. Lost notifications would cause this test to time out. + let mut set = JoinSet::new(); + let state = AwaitableState::new(0); + let mut tasks = (0..TASKS).collect_vec(); + let mut rng = rand::rng(); + tasks.shuffle(&mut rng); + for i in (0..TASKS).rev() { + set.spawn({ + let state = state.clone(); + async move { + state.wait_while(|v| *v != i).await; + state.modify(|v| *v += 1); + } + }); + } + set.join_all().await; + assert_eq!(state.read(|i| *i), TASKS); + } } diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 1ffe69be17230..d446a95c64f46 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -364,8 +364,7 @@ impl PersistClient { Arc::clone(&self.blob), reader_id, schemas, - reader_state.since, - heartbeat_ts, + reader_state, ) .await; @@ -920,7 +919,6 @@ impl PersistClient { #[cfg(test)] mod tests { use std::future::Future; - use std::mem; use std::pin::Pin; use std::task::Context; use std::time::Duration; @@ -2007,14 +2005,12 @@ mod tests { .expect("client construction failed") .expect_open::<(), (), u64, i64>(ShardId::new()) .await; - let mut read_unexpired_state = read + let read_unexpired_state = read .unexpired_state .take() .expect("handle should have unexpired state"); read.expire().await; - for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) { - let () = read_heartbeat_task.await; - } + read_unexpired_state.heartbeat_task.await } /// Verify that shard finalization works with empty shards, shards that have diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 21ee955db815a..23a263343b7c0 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -10,24 +10,23 @@ //! Read capabilities and handles use async_stream::stream; -use std::backtrace::Backtrace; use std::collections::BTreeMap; use std::fmt::Debug; use std::future::Future; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; +use differential_dataflow::Hashable; use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::difference::Monoid; use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::Description; use futures::Stream; use futures_util::{StreamExt, stream}; use mz_dyncfg::Config; +use mz_ore::cast::CastLossy; use mz_ore::halt; use mz_ore::instrument; -use mz_ore::now::EpochMillis; -use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt}; +use mz_ore::task::JoinHandle; use mz_persist::location::{Blob, SeqNo}; use mz_persist_types::columnar::{ColumnDecoder, Schema}; use mz_persist_types::{Codec, Codec64}; @@ -36,8 +35,7 @@ use serde::{Deserialize, Serialize}; use timely::PartialOrder; use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; -use tokio::runtime::Handle; -use tracing::{Instrument, debug_span, warn}; +use tracing::warn; use uuid::Uuid; use crate::batch::BLOB_TARGET_SIZE; @@ -45,10 +43,10 @@ use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters}; use crate::fetch::FetchConfig; use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part}; use crate::internal::encoding::Schemas; -use crate::internal::machine::{ExpireFn, Machine}; +use crate::internal::machine::Machine; use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics}; -use crate::internal::state::{BatchPart, HollowBatch}; -use crate::internal::watch::StateWatch; +use crate::internal::state::{HollowBatch, LeasedReaderState}; +use crate::internal::watch::{AwaitableState, StateWatch}; use crate::iter::{Consolidator, StructuredSort}; use crate::schema::SchemaCache; use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats}; @@ -201,12 +199,10 @@ where { /// Politely expires this subscribe, releasing its lease. /// - /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the + /// There is a best-effort impl in Drop to expire the /// [`ReadHandle`] held by the subscribe that wasn't explicitly expired /// with this method. When possible, explicit expiry is still preferred - /// because the Drop one is best effort and is dependant on a tokio - /// [Handle] being available in the TLC at the time of drop (which is a bit - /// subtle). Also, explicit expiry allows for control over when it happens. + /// because it also ensures that the background task is complete. pub async fn expire(mut self) { let _ = self.snapshot.take(); // Drop all leased parts. self.listen.expire().await; @@ -228,8 +224,6 @@ pub enum ListenEvent { #[derive(Debug)] pub struct Listen { handle: ReadHandle, - watch: StateWatch, - as_of: Antichain, since: Antichain, frontier: Antichain, @@ -258,11 +252,8 @@ where // (initially as_of although the frontier is inclusive and the as_of // isn't). Be a good citizen and downgrade early. handle.downgrade_since(&since).await; - - let watch = handle.machine.applier.watch(); Ok(Listen { handle, - watch, since, frontier: as_of.clone(), as_of, @@ -290,7 +281,7 @@ where let min_elapsed = self.handle.heartbeat_duration(); let next_batch = self.handle.machine.next_listen_batch( &self.frontier, - &mut self.watch, + &mut self.handle.watch, Some(&self.handle.reader_id), retry, ); @@ -490,17 +481,69 @@ where /// Politely expires this listen, releasing its lease. /// - /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the + /// There is a best-effort impl in to expire the /// [`ReadHandle`] held by the listen that wasn't explicitly expired with /// this method. When possible, explicit expiry is still preferred because - /// the Drop one is best effort and is dependant on a tokio [Handle] being - /// available in the TLC at the time of drop (which is a bit subtle). Also, - /// explicit expiry allows for control over when it happens. + /// it also ensures that the background task is complete. pub async fn expire(self) { self.handle.expire().await } } +/// The state for read holds shared between the heartbeat task and the main client, +/// including the since frontier and the seqno hold. +#[derive(Debug)] +pub(crate) struct ReadHolds { + /// The frontier we should hold the time-based lease back to. + held_since: Antichain, + /// The since hold we've actually committed to state. Should always be <= + /// the since hold. + applied_since: Antichain, + /// The largest seqno we've observed in the state. + recent_seqno: SeqNo, + /// The set of active leases. We hold back the seqno to the minimum lease or + /// the recent_seqno, whichever is earlier. + leases: BTreeMap, + /// True iff this state is expired. + expired: bool, + /// Used to trigger the background task to heartbeat state, instead of waiting for + /// the next tick. + request_sync: bool, +} + +impl ReadHolds +where + T: Timestamp + TotalOrder + Lattice + Codec64 + Sync, +{ + pub fn downgrade_since(&mut self, since: &Antichain) { + self.held_since.join_assign(since); + } + + pub fn observe_seqno(&mut self, seqno: SeqNo) { + self.recent_seqno = seqno.max(self.recent_seqno); + } + + pub fn lease_seqno(&mut self) -> Lease { + let seqno = self.recent_seqno; + let lease = self + .leases + .entry(seqno) + .or_insert_with(|| Lease::new(seqno)); + lease.clone() + } + + pub fn outstanding_seqno(&mut self) -> SeqNo { + while let Some(first) = self.leases.first_entry() { + if first.get().count() <= 1 { + first.remove(); + } else { + return *first.key(); + } + } + self.recent_seqno + } +} + /// A "capability" granting the ability to read the state of some shard at times /// greater or equal to `self.since()`. /// @@ -528,13 +571,14 @@ pub struct ReadHandle { pub(crate) machine: Machine, pub(crate) gc: GarbageCollector, pub(crate) blob: Arc, + watch: StateWatch, + pub(crate) reader_id: LeasedReaderId, pub(crate) read_schemas: Schemas, pub(crate) schema_cache: SchemaCache, since: Antichain, - pub(crate) last_heartbeat: EpochMillis, - pub(crate) leased_seqnos: BTreeMap, + pub(crate) hold_state: AwaitableState>, pub(crate) unexpired_state: Option, } @@ -553,6 +597,7 @@ where T: Timestamp + TotalOrder + Lattice + Codec64 + Sync, D: Monoid + Codec64 + Send + Sync, { + #[allow(clippy::unused_async)] pub(crate) async fn new( cfg: PersistConfig, metrics: Arc, @@ -561,35 +606,154 @@ where blob: Arc, reader_id: LeasedReaderId, read_schemas: Schemas, - since: Antichain, - last_heartbeat: EpochMillis, + state: LeasedReaderState, ) -> Self { let schema_cache = machine.applier.schema_cache(); - let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone()); + let hold_state = AwaitableState::new(ReadHolds { + held_since: state.since.clone(), + applied_since: state.since.clone(), + recent_seqno: state.seqno, + leases: Default::default(), + expired: false, + request_sync: false, + }); ReadHandle { cfg, metrics: Arc::clone(&metrics), machine: machine.clone(), gc: gc.clone(), blob, + watch: machine.applier.watch(), reader_id: reader_id.clone(), read_schemas, schema_cache, - since, - last_heartbeat, - leased_seqnos: BTreeMap::new(), + since: state.since, + hold_state: hold_state.clone(), unexpired_state: Some(UnexpiredReadHandleState { - expire_fn, - _heartbeat_tasks: machine - .start_reader_heartbeat_tasks(reader_id, gc) - .await - .into_iter() - .map(JoinHandle::abort_on_drop) - .collect(), + heartbeat_task: Self::start_reader_heartbeat_task( + machine, reader_id, gc, hold_state, + ), }), } } + fn start_reader_heartbeat_task( + machine: Machine, + reader_id: LeasedReaderId, + gc: GarbageCollector, + leased_seqnos: AwaitableState>, + ) -> JoinHandle<()> { + let metrics = Arc::clone(&machine.applier.metrics); + let name = format!( + "persist::heartbeat_read({},{})", + machine.shard_id(), + reader_id + ); + mz_ore::task::spawn(|| name, { + metrics.tasks.heartbeat_read.instrument_task(async move { + Self::reader_heartbeat_task(machine, reader_id, gc, leased_seqnos).await + }) + }) + } + + async fn reader_heartbeat_task( + machine: Machine, + reader_id: LeasedReaderId, + gc: GarbageCollector, + leased_seqnos: AwaitableState>, + ) { + let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 4; + // Jitter the first tick to avoid a thundering herd when many readers are started around + // the same instant, like during deploys. + let jitter: f64 = f64::cast_lossy(reader_id.hashed()) / f64::cast_lossy(u64::MAX); + let mut interval = tokio::time::interval_at( + tokio::time::Instant::now() + sleep_duration.mul_f64(jitter), + sleep_duration, + ); + let mut held_since = leased_seqnos.read(|s| s.held_since.clone()); + loop { + let before_sleep = Instant::now(); + let _woke_by_tick = tokio::select! { + _tick = interval.tick() => { + true + } + _whatever = leased_seqnos.wait_while(|s| !s.request_sync) => { + false + } + }; + + let elapsed_since_before_sleeping = before_sleep.elapsed(); + if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) { + warn!( + "reader ({}) of shard ({}) went {}s between heartbeats", + reader_id, + machine.shard_id(), + elapsed_since_before_sleeping.as_secs_f64() + ); + } + + let before_heartbeat = Instant::now(); + let heartbeat_ms = (machine.applier.cfg.now)(); + let current_seqno = machine.seqno(); + let result = leased_seqnos.modify(|s| { + if s.expired { + Err(()) + } else { + s.observe_seqno(current_seqno); + s.request_sync = false; + held_since.join_assign(&s.held_since); + Ok(s.outstanding_seqno()) + } + }); + let actual_since = match result { + Ok(held_seqno) => { + let (seqno, actual_since, maintenance) = machine + .downgrade_since(&reader_id, held_seqno, &held_since, heartbeat_ms) + .await; + leased_seqnos.modify(|s| { + s.applied_since.clone_from(&actual_since.0); + s.observe_seqno(seqno) + }); + maintenance.start_performing(&machine, &gc); + actual_since + } + Err(()) => { + let (seqno, maintenance) = machine.expire_leased_reader(&reader_id).await; + leased_seqnos.modify(|s| s.observe_seqno(seqno)); + maintenance.start_performing(&machine, &gc); + break; + } + }; + + let elapsed_since_heartbeat = before_heartbeat.elapsed(); + if elapsed_since_heartbeat > Duration::from_secs(60) { + warn!( + "reader ({}) of shard ({}) heartbeat call took {}s", + reader_id, + machine.shard_id(), + elapsed_since_heartbeat.as_secs_f64(), + ); + } + + if PartialOrder::less_than(&held_since, &actual_since.0) { + // If the read handle was intentionally expired, this task + // *should* be aborted before it observes the expiration. So if + // we get here, this task somehow failed to keep the read lease + // alive. Warn loudly, because there's now a live read handle to + // an expired shard that will panic if used, but don't panic, + // just in case there is some edge case that results in this + // task observing the intentional expiration of a read handle. + warn!( + "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \ + while read handle is live", + reader_id, + machine.shard_id(), + ); + return; + } + } + } + /// This handle's shard id. pub fn shard_id(&self) -> ShardId { self.machine.shard_id() @@ -602,15 +766,13 @@ where &self.since } - fn outstanding_seqno(&mut self) -> Option { - while let Some(first) = self.leased_seqnos.first_entry() { - if first.get().count() <= 1 { - first.remove(); - } else { - return Some(*first.key()); - } - } - None + #[cfg(test)] + fn outstanding_seqno(&self) -> SeqNo { + let current_seqno = self.machine.seqno(); + self.hold_state.modify(|s| { + s.observe_seqno(current_seqno); + s.outstanding_seqno() + }) } /// Forwards the since frontier of this handle, giving up the ability to @@ -619,47 +781,16 @@ where /// This may trigger (asynchronous) compaction and consolidation in the /// system. A `new_since` of the empty antichain "finishes" this shard, /// promising that no more data will ever be read by this handle. - /// - /// This also acts as a heartbeat for the reader lease (including if called - /// with `new_since` equal to something like `self.since()` or the minimum - /// timestamp, making the call a no-op). #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))] pub async fn downgrade_since(&mut self, new_since: &Antichain) { - // Guaranteed to be the smallest/oldest outstanding lease on a `SeqNo`. - let outstanding_seqno = self.outstanding_seqno(); - - let heartbeat_ts = (self.cfg.now)(); - let (_seqno, current_reader_since, maintenance) = self - .machine - .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts) + self.since = new_since.clone(); + self.hold_state.modify(|s| { + s.downgrade_since(new_since); + s.request_sync = true; + }); + self.hold_state + .wait_while(|s| PartialOrder::less_than(&s.applied_since, new_since)) .await; - - // Debugging for database-issues#4590. - if let Some(outstanding_seqno) = outstanding_seqno { - let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0); - // We get just over 1 seqno-per-second on average for a shard in - // prod, so this is about an hour. - const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60; - if seqnos_held >= SEQNOS_HELD_THRESHOLD { - tracing::info!( - "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}", - self.machine.shard_id(), - self.reader_id, - outstanding_seqno, - _seqno, - self.leased_seqnos.keys().take(10).collect::>(), - // The Debug impl of backtrace is less aesthetic, but will put the trace - // on a single line and play more nicely with our Honeycomb quota - Backtrace::force_capture(), - ); - } - } - - self.since = current_reader_since.0; - // A heartbeat is just any downgrade_since traffic, so update the - // internal rate limiter here to play nicely with `maybe_heartbeat`. - self.last_heartbeat = heartbeat_ts; - maintenance.start_performing(&self.machine, &self.gc); } /// Returns an ongoing subscription of updates to a shard. @@ -748,23 +879,6 @@ where Ok(Subscribe::new(snapshot_parts, listen)) } - fn lease_batch_part( - &mut self, - desc: Description, - part: BatchPart, - filter: FetchBatchFilter, - ) -> LeasedBatchPart { - LeasedBatchPart { - metrics: Arc::clone(&self.metrics), - shard_id: self.machine.shard_id(), - filter, - desc, - part, - lease: self.lease_seqno(), - filter_pushdown_audit: false, - } - } - fn lease_batch_parts( &mut self, batch: HollowBatch, @@ -774,8 +888,17 @@ where let blob = Arc::clone(&self.blob); let metrics = Arc::clone(&self.metrics); let desc = batch.desc.clone(); + let lease = self.lease_seqno().await; for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) { - yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone()) + yield LeasedBatchPart { + metrics: Arc::clone(&self.metrics), + shard_id: self.machine.shard_id(), + filter: filter.clone(), + desc: desc.clone(), + part: part.expect("leased part").into_owned(), + lease: lease.clone(), + filter_pushdown_audit: false, + } } } } @@ -783,13 +906,18 @@ where /// Tracks that the `ReadHandle`'s machine's current `SeqNo` is being /// "leased out" to a `LeasedBatchPart`, and cannot be garbage /// collected until its lease has been returned. - fn lease_seqno(&mut self) -> Lease { - let seqno = self.machine.seqno(); - let lease = self - .leased_seqnos - .entry(seqno) - .or_insert_with(|| Lease::new(seqno)); - lease.clone() + async fn lease_seqno(&mut self) -> Lease { + let current_seqno = self.machine.seqno(); + let lease = self.hold_state.modify(|s| { + s.observe_seqno(current_seqno); + s.lease_seqno() + }); + // The seqno we've leased may be the seqno observed by our heartbeat task, which could be + // ahead of the last state we saw. Ensure we only observe states in the future of our hold. + // (Since these are backed by the same state in the same process, this should all be pretty + // fast.) + self.watch.wait_for_seqno_ge(lease.seqno()).await; + lease } /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the @@ -822,8 +950,7 @@ where Arc::clone(&self.blob), new_reader_id, self.read_schemas.clone(), - reader_state.since, - heartbeat_ts, + reader_state, ) .await; new_reader @@ -835,49 +962,31 @@ where /// A rate-limited version of [Self::downgrade_since]. /// - /// This is an internally rate limited helper, designed to allow users to - /// call it as frequently as they like. Call this or [Self::downgrade_since], - /// on some interval that is "frequent" compared to the read lease duration. + /// Users can call this as frequently as they wish; changes will be periodically + /// flushed to state by the heartbeat task. + #[allow(clippy::unused_async)] pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain) { - let min_elapsed = self.heartbeat_duration(); - let elapsed_since_last_heartbeat = - Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat)); - if elapsed_since_last_heartbeat >= min_elapsed { - self.downgrade_since(new_since).await; - } + self.since = new_since.clone(); + self.hold_state.modify(|s| { + s.downgrade_since(new_since); + }); } /// Politely expires this reader, releasing its lease. /// /// There is a best-effort impl in Drop to expire a reader that wasn't /// explictly expired with this method. When possible, explicit expiry is - /// still preferred because the Drop one is best effort and is dependant on - /// a tokio [Handle] being available in the TLC at the time of drop (which - /// is a bit subtle). Also, explicit expiry allows for control over when it - /// happens. + /// still preferred because it also ensures that the background task is complete. #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))] pub async fn expire(mut self) { - // We drop the unexpired state before expiring the reader to ensure the - // heartbeat tasks can never observe the expired state. This doesn't - // matter for correctness, but avoids confusing log output if the - // heartbeat task were to discover that its lease has been expired. + self.hold_state.modify(|s| { + s.expired = true; + s.request_sync = true; + }); let Some(unexpired_state) = self.unexpired_state.take() else { return; }; - unexpired_state.expire_fn.0().await; - } - - fn expire_fn( - machine: Machine, - gc: GarbageCollector, - reader_id: LeasedReaderId, - ) -> ExpireFn { - ExpireFn(Box::new(move || { - Box::pin(async move { - let (_, maintenance) = machine.expire_leased_reader(&reader_id).await; - maintenance.start_performing(&machine, &gc); - }) - })) + unexpired_state.heartbeat_task.await; } /// Test helper for a [Self::listen] call that is expected to succeed. @@ -893,8 +1002,7 @@ where /// State for a read handle that has not been explicitly expired. #[derive(Debug)] pub(crate) struct UnexpiredReadHandleState { - expire_fn: ExpireFn, - pub(crate) _heartbeat_tasks: Vec>, + pub(crate) heartbeat_task: JoinHandle<()>, } /// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor]. @@ -1022,7 +1130,7 @@ where should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool, ) -> Result, Since> { let batches = self.machine.snapshot(&as_of).await?; - let lease = self.lease_seqno(); + let lease = self.lease_seqno().await; Self::read_batches_consolidated( &self.cfg, @@ -1235,34 +1343,10 @@ where impl Drop for ReadHandle { fn drop(&mut self) { - // We drop the unexpired state before expiring the reader to ensure the - // heartbeat tasks can never observe the expired state. This doesn't - // matter for correctness, but avoids confusing log output if the - // heartbeat task were to discover that its lease has been expired. - let Some(unexpired_state) = self.unexpired_state.take() else { - return; - }; - - let handle = match Handle::try_current() { - Ok(x) => x, - Err(_) => { - warn!( - "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout", - self.reader_id - ); - return; - } - }; - // Spawn a best-effort task to expire this read handle. It's fine if - // this doesn't run to completion, we'd just have to wait out the lease - // before the shard-global since is unblocked. - // - // Intentionally create the span outside the task to set the parent. - let expire_span = debug_span!("drop::expire"); - handle.spawn_named( - || format!("ReadHandle::expire ({})", self.reader_id), - unexpired_state.expire_fn.0().instrument(expire_span), - ); + self.hold_state.modify(|s| { + s.expired = true; + s.request_sync = true; + }); } } @@ -1453,8 +1537,16 @@ mod tests { .await .expect("cannot serve requested as_of"); - // Determine sequence number at outset. - let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since(); + // Determine the sequence number held by our subscribe. + let original_seqno_since = subscribe.listen.handle.outstanding_seqno(); + if let Some(snapshot) = &subscribe.snapshot { + for part in snapshot { + assert!( + part.lease.seqno() >= original_seqno_since, + "our seqno hold must cover all parts" + ); + } + } let mut parts = vec![]; @@ -1544,7 +1636,7 @@ mod tests { // We should expect the SeqNo to be downgraded if this part's SeqNo // is no longer leased to any other parts, either. - let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno); + let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > part_seqno; let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since(); if expect_downgrade { diff --git a/src/storage/src/source/reclock.rs b/src/storage/src/source/reclock.rs index f84e27544d4aa..bf0d744d815b8 100644 --- a/src/storage/src/source/reclock.rs +++ b/src/storage/src/source/reclock.rs @@ -544,7 +544,12 @@ mod tests { // Regression test for // https://github.com/MaterializeInc/database-issues/issues/4216. #[mz_ore::test(tokio::test(start_paused = true))] - #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux` + #[cfg_attr(miri, ignore)] + // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux` + // This test depends on the exact details of when the Persist client flushes frontier progress + // to state, which is not public API and should not be essential for correctness. (Today, the + // as_of is held back by the spawned task in the async storage worker.) + #[ignore] async fn test_since_hold() { let binding_shard = ShardId::new();