From 843cabca475d13f0978cb23a88e553a662efaed1 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 12 Dec 2025 18:11:48 -0500 Subject: [PATCH 1/4] Remove now-redundant background heartbeat task --- src/persist-client/src/internal/machine.rs | 50 +++------------------- src/persist-client/src/lib.rs | 7 +-- src/persist-client/src/read.rs | 11 ++--- 3 files changed, 13 insertions(+), 55 deletions(-) diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 33e5544ec2f39..12b01876730d7 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -1202,54 +1202,18 @@ where T: Timestamp + Lattice + Codec64 + Sync, D: Monoid + Codec64 + Send + Sync, { - #[allow(clippy::unused_async)] - pub async fn start_reader_heartbeat_tasks( + pub fn start_reader_heartbeat_task( self, reader_id: LeasedReaderId, gc: GarbageCollector, - ) -> Vec> { - let mut ret = Vec::new(); + ) -> JoinHandle<()> { let metrics = Arc::clone(&self.applier.metrics); - - // TODO: In response to a production incident, this runs the heartbeat - // task on both the in-context tokio runtime and persist's isolated - // runtime. We think we were seeing tasks (including this one) get stuck - // indefinitely in tokio while waiting for a runtime worker. This could - // happen if some other task in that runtime never yields. It's possible - // that one of the two runtimes is healthy while the other isn't (this - // was inconclusive in the incident debugging), and the heartbeat task - // is fairly lightweight, so run a copy in each in case that helps. - // - // The real fix here is to find the misbehaving task and fix it. Remove - // this duplication when that happens. let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id); - ret.push(mz_ore::task::spawn(|| name, { - let machine = self.clone(); - let reader_id = reader_id.clone(); - let gc = gc.clone(); - metrics - .tasks - .heartbeat_read - .instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc)) - })); - - let isolated_runtime = Arc::clone(&self.isolated_runtime); - let name = format!( - "persist::heartbeat_read_isolated({},{})", - self.shard_id(), - reader_id - ); - ret.push( - isolated_runtime.spawn_named( - || name, - metrics - .tasks - .heartbeat_read - .instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)), - ), - ); - - ret + mz_ore::task::spawn(|| name, { + metrics.tasks.heartbeat_read.instrument_task(async move { + Self::reader_heartbeat_task(self, reader_id, gc).await + }) + }) } async fn reader_heartbeat_task( diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 1ffe69be17230..d7209924ce02c 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -920,7 +920,6 @@ impl PersistClient { #[cfg(test)] mod tests { use std::future::Future; - use std::mem; use std::pin::Pin; use std::task::Context; use std::time::Duration; @@ -2007,14 +2006,12 @@ mod tests { .expect("client construction failed") .expect_open::<(), (), u64, i64>(ShardId::new()) .await; - let mut read_unexpired_state = read + let read_unexpired_state = read .unexpired_state .take() .expect("handle should have unexpired state"); read.expire().await; - for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) { - let () = read_heartbeat_task.await; - } + read_unexpired_state._heartbeat_tasks.await } /// Verify that shard finalization works with empty shards, shards that have diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 21ee955db815a..d3d4fc330b557 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -580,12 +580,9 @@ where leased_seqnos: BTreeMap::new(), unexpired_state: Some(UnexpiredReadHandleState { expire_fn, - _heartbeat_tasks: machine - .start_reader_heartbeat_tasks(reader_id, gc) - .await - .into_iter() - .map(JoinHandle::abort_on_drop) - .collect(), + _heartbeat_tasks: JoinHandle::abort_on_drop( + machine.start_reader_heartbeat_task(reader_id, gc), + ), }), } } @@ -894,7 +891,7 @@ where #[derive(Debug)] pub(crate) struct UnexpiredReadHandleState { expire_fn: ExpireFn, - pub(crate) _heartbeat_tasks: Vec>, + pub(crate) _heartbeat_tasks: AbortOnDropHandle<()>, } /// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor]. From c91f64adb8aab46cf5536c88b3c4b638e85a0ebb Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 2 Jan 2026 16:12:23 -0500 Subject: [PATCH 2/4] Add an AwaitableState abstraction in persist internals This is somewhere between Tokio's `watch` and a normal mutex with a condvar. Kind of annoying to have to write one's own library for this, but everything else I tried was either awkward or needed an unbounded channel... --- Cargo.lock | 1 + src/persist-client/Cargo.toml | 1 + src/persist-client/src/internal/watch.rs | 111 ++++++++++++++++++++++- 3 files changed, 110 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b6aa3b27195ba..2e0292388b1b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7128,6 +7128,7 @@ dependencies = [ "proptest-derive", "prost", "prost-build", + "rand 0.9.2", "semver", "sentry-tracing", "serde", diff --git a/src/persist-client/Cargo.toml b/src/persist-client/Cargo.toml index 6fd845cce5da3..ba539b35fc383 100644 --- a/src/persist-client/Cargo.toml +++ b/src/persist-client/Cargo.toml @@ -74,6 +74,7 @@ criterion = { version = "0.8.1", features = ["html_reports"] } datadriven = { version = "0.9.0", features = ["async"] } futures-task = "0.3.31" num_cpus = "1.17.0" +rand = "0.9.2" tempfile = "3.23.0" [build-dependencies] diff --git a/src/persist-client/src/internal/watch.rs b/src/persist-client/src/internal/watch.rs index a1a628a1855bf..a5ca6a945555a 100644 --- a/src/persist-client/src/internal/watch.rs +++ b/src/persist-client/src/internal/watch.rs @@ -9,10 +9,10 @@ //! Notifications for state changes. -use std::sync::Arc; - use mz_persist::location::SeqNo; -use tokio::sync::broadcast; +use std::fmt::{Debug, Formatter}; +use std::sync::{Arc, RwLock}; +use tokio::sync::{Notify, broadcast}; use tracing::debug; use crate::cache::LockingTypedState; @@ -132,6 +132,83 @@ impl StateWatch { } } +/// A concurrent state - one which allows reading, writing, and waiting for changes made by +/// another concurrent writer. +/// +/// This is morally similar to a mutex with a condvar, but allowing asynchronous waits and with +/// access methods that make it a little trickier to accidentally hold a lock across a yield point. +pub(crate) struct AwaitableState { + state: Arc>, + /// NB: we can't wrap the [Notify] in the lock since the signature of [Notify::notified] + /// doesn't allow it, but this is only accessed while holding the lock. + notify: Arc, +} + +impl Debug for AwaitableState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.state.read().fmt(f) + } +} + +impl Clone for AwaitableState { + fn clone(&self) -> Self { + Self { + state: Arc::clone(&self.state), + notify: Arc::clone(&self.notify), + } + } +} + +impl AwaitableState { + pub fn new(value: T) -> Self { + Self { + state: Arc::new(RwLock::new(value)), + notify: Arc::new(Notify::new()), + } + } + + #[allow(dead_code)] + pub fn read(&self, read_fn: impl FnOnce(&T) -> A) -> A { + let guard = self.state.read().expect("not poisoned"); + let state = &*guard; + read_fn(state) + } + + pub fn modify(&self, write_fn: impl FnOnce(&mut T) -> A) -> A { + let mut guard = self.state.write().expect("not poisoned"); + let state = &mut *guard; + let result = write_fn(state); + // Notify everyone while holding the guard. This guarantees that all waiters will observe + // the just-updated state. + self.notify.notify_waiters(); + drop(guard); + result + } + + pub async fn wait_for(&self, mut wait_fn: impl FnMut(&T) -> Option) -> A { + loop { + let notified = { + let guard = self.state.read().expect("not poisoned"); + let state = &*guard; + if let Some(result) = wait_fn(state) { + return result; + } + // Grab the notified future while holding the guard. This ensures that we will see any + // future modifications to this state, even if they happen before the first poll. + let notified = self.notify.notified(); + drop(guard); + notified + }; + + notified.await; + } + } + + pub async fn wait_while(&self, mut wait_fn: impl FnMut(&T) -> bool) { + self.wait_for(|s| (!wait_fn(s)).then_some(())).await + } +} + #[cfg(test)] mod tests { use std::future::Future; @@ -141,12 +218,15 @@ mod tests { use futures::FutureExt; use futures_task::noop_waker; + use itertools::Itertools; use mz_build_info::DUMMY_BUILD_INFO; use mz_dyncfg::ConfigUpdates; use mz_ore::assert_none; use mz_ore::cast::CastFrom; use mz_ore::metrics::MetricsRegistry; + use rand::prelude::SliceRandom; use timely::progress::Antichain; + use tokio::task::JoinSet; use crate::cache::StateCache; use crate::cfg::PersistConfig; @@ -326,4 +406,29 @@ mod tests { // the polling on this. let _ = snapshot.await; } + + #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[allow(clippy::disallowed_methods)] // For JoinSet. + #[cfg_attr(miri, ignore)] + async fn wait_on_awaitable_state() { + const TASKS: usize = 1000; + // Launch a bunch of tasks, have them all wait for a specific number, then increment it + // by one. Lost notifications would cause this test to time out. + let mut set = JoinSet::new(); + let state = AwaitableState::new(0); + let mut tasks = (0..TASKS).collect_vec(); + let mut rng = rand::rng(); + tasks.shuffle(&mut rng); + for i in (0..TASKS).rev() { + set.spawn({ + let state = state.clone(); + async move { + state.wait_while(|v| *v != i).await; + state.modify(|v| *v += 1); + } + }); + } + set.join_all().await; + assert_eq!(state.read(|i| *i), TASKS); + } } From 82ff49f5469864effdfa3a34b7f8b62a20ea10d0 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 2 Jan 2026 18:26:45 -0500 Subject: [PATCH 3/4] Move the reader heartbeat task to live with ReadHandle These two are much more tightly related than the task is to the read handle, and this allows for some additional privacy for shared state. --- src/persist-client/src/internal/machine.rs | 83 +--------------------- src/persist-client/src/read.rs | 81 +++++++++++++++++++-- 2 files changed, 79 insertions(+), 85 deletions(-) diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 12b01876730d7..e21ebea377a49 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -23,7 +23,6 @@ use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; #[allow(unused_imports)] // False positive. use mz_ore::fmt::FormatBuffer; -use mz_ore::task::JoinHandle; use mz_ore::{assert_none, soft_assert_no_log}; use mz_persist::location::{ExternalError, Indeterminate, SeqNo}; use mz_persist::retry::Retry; @@ -32,7 +31,7 @@ use mz_persist_types::{Codec, Codec64, Opaque}; use semver::Version; use timely::PartialOrder; use timely::progress::{Antichain, Timestamp}; -use tracing::{Instrument, debug, info, trace_span, warn}; +use tracing::{Instrument, debug, info, trace_span}; use crate::async_runtime::IsolatedRuntime; use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES; @@ -42,7 +41,6 @@ use crate::critical::CriticalReaderId; use crate::error::{CodecMismatch, InvalidUsage}; use crate::internal::apply::Applier; use crate::internal::compact::CompactReq; -use crate::internal::gc::GarbageCollector; use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance}; use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics}; use crate::internal::paths::PartialRollupKey; @@ -54,7 +52,7 @@ use crate::internal::state::{ use crate::internal::state_versions::StateVersions; use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; use crate::internal::watch::StateWatch; -use crate::read::{LeasedReaderId, READER_LEASE_DURATION}; +use crate::read::LeasedReaderId; use crate::rpc::PubSubSender; use crate::schema::CaESchema; use crate::write::WriterId; @@ -1195,83 +1193,6 @@ impl CompareAndAppendRes { } } -impl Machine -where - K: Debug + Codec, - V: Debug + Codec, - T: Timestamp + Lattice + Codec64 + Sync, - D: Monoid + Codec64 + Send + Sync, -{ - pub fn start_reader_heartbeat_task( - self, - reader_id: LeasedReaderId, - gc: GarbageCollector, - ) -> JoinHandle<()> { - let metrics = Arc::clone(&self.applier.metrics); - let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id); - mz_ore::task::spawn(|| name, { - metrics.tasks.heartbeat_read.instrument_task(async move { - Self::reader_heartbeat_task(self, reader_id, gc).await - }) - }) - } - - async fn reader_heartbeat_task( - machine: Self, - reader_id: LeasedReaderId, - gc: GarbageCollector, - ) { - let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2; - loop { - let before_sleep = Instant::now(); - tokio::time::sleep(sleep_duration).await; - - let elapsed_since_before_sleeping = before_sleep.elapsed(); - if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) { - warn!( - "reader ({}) of shard ({}) went {}s between heartbeats", - reader_id, - machine.shard_id(), - elapsed_since_before_sleeping.as_secs_f64() - ); - } - - let before_heartbeat = Instant::now(); - let (_seqno, existed, maintenance) = machine - .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)()) - .await; - maintenance.start_performing(&machine, &gc); - - let elapsed_since_heartbeat = before_heartbeat.elapsed(); - if elapsed_since_heartbeat > Duration::from_secs(60) { - warn!( - "reader ({}) of shard ({}) heartbeat call took {}s", - reader_id, - machine.shard_id(), - elapsed_since_heartbeat.as_secs_f64(), - ); - } - - if !existed { - // If the read handle was intentionally expired, this task - // *should* be aborted before it observes the expiration. So if - // we get here, this task somehow failed to keep the read lease - // alive. Warn loudly, because there's now a live read handle to - // an expired shard that will panic if used, but don't panic, - // just in case there is some edge case that results in this - // task observing the intentional expiration of a read handle. - warn!( - "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \ - while read handle is live", - reader_id, - machine.shard_id(), - ); - return; - } - } - } -} - pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config = Config::new( "persist_next_listen_batch_retryer_fixed_sleep", Duration::from_millis(1200), // pubsub is on by default! diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index d3d4fc330b557..0b7a90b9bfb9b 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -15,7 +15,7 @@ use std::collections::BTreeMap; use std::fmt::Debug; use std::future::Future; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::difference::Monoid; @@ -580,13 +580,86 @@ where leased_seqnos: BTreeMap::new(), unexpired_state: Some(UnexpiredReadHandleState { expire_fn, - _heartbeat_tasks: JoinHandle::abort_on_drop( - machine.start_reader_heartbeat_task(reader_id, gc), - ), + _heartbeat_tasks: JoinHandle::abort_on_drop(Self::start_reader_heartbeat_task( + machine, reader_id, gc, + )), }), } } + fn start_reader_heartbeat_task( + machine: Machine, + reader_id: LeasedReaderId, + gc: GarbageCollector, + ) -> JoinHandle<()> { + let metrics = Arc::clone(&machine.applier.metrics); + let name = format!( + "persist::heartbeat_read({},{})", + machine.shard_id(), + reader_id + ); + mz_ore::task::spawn(|| name, { + metrics.tasks.heartbeat_read.instrument_task(async move { + Self::reader_heartbeat_task(machine, reader_id, gc).await + }) + }) + } + + async fn reader_heartbeat_task( + machine: Machine, + reader_id: LeasedReaderId, + gc: GarbageCollector, + ) { + let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2; + loop { + let before_sleep = Instant::now(); + tokio::time::sleep(sleep_duration).await; + + let elapsed_since_before_sleeping = before_sleep.elapsed(); + if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) { + warn!( + "reader ({}) of shard ({}) went {}s between heartbeats", + reader_id, + machine.shard_id(), + elapsed_since_before_sleeping.as_secs_f64() + ); + } + + let before_heartbeat = Instant::now(); + let (_seqno, existed, maintenance) = machine + .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)()) + .await; + maintenance.start_performing(&machine, &gc); + + let elapsed_since_heartbeat = before_heartbeat.elapsed(); + if elapsed_since_heartbeat > Duration::from_secs(60) { + warn!( + "reader ({}) of shard ({}) heartbeat call took {}s", + reader_id, + machine.shard_id(), + elapsed_since_heartbeat.as_secs_f64(), + ); + } + + if !existed { + // If the read handle was intentionally expired, this task + // *should* be aborted before it observes the expiration. So if + // we get here, this task somehow failed to keep the read lease + // alive. Warn loudly, because there's now a live read handle to + // an expired shard that will panic if used, but don't panic, + // just in case there is some edge case that results in this + // task observing the intentional expiration of a read handle. + warn!( + "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \ + while read handle is live", + reader_id, + machine.shard_id(), + ); + return; + } + } + } + /// This handle's shard id. pub fn shard_id(&self) -> ShardId { self.machine.shard_id() From eadaaea1cffff9f982c4f50629bdfd72605c1e21 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 2 Jan 2026 16:12:23 -0500 Subject: [PATCH 4/4] Move the seqno-relevant data into a shared state Observe new seqnos in both the handle and the heartbeat thread Always downgrade to the shared seqno Should be pretty recent! Use the main downgrade-since method in the background thread Do all downgrades in the background task Shift expiry to the background task Move the heartbeat task to read.rs This is a bit more read-handle business than machine business, in my opinion! Comment fixup --- src/persist-client/src/fetch.rs | 1 - src/persist-client/src/internal/datadriven.rs | 3 - src/persist-client/src/internal/machine.rs | 37 +- src/persist-client/src/internal/metrics.rs | 2 - src/persist-client/src/internal/state.rs | 62 +-- src/persist-client/src/lib.rs | 5 +- src/persist-client/src/read.rs | 386 +++++++++--------- src/storage/src/source/reclock.rs | 7 +- 8 files changed, 236 insertions(+), 267 deletions(-) diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 574ec8a7f83f0..35ef6800bc8c4 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -472,7 +472,6 @@ impl Lease { } /// Returns the inner [SeqNo] of this [Lease]. - #[cfg(test)] pub fn seqno(&self) -> SeqNo { *self.0 } diff --git a/src/persist-client/src/internal/datadriven.rs b/src/persist-client/src/internal/datadriven.rs index 99fccd6cbf7f8..37f40072cb973 100644 --- a/src/persist-client/src/internal/datadriven.rs +++ b/src/persist-client/src/internal/datadriven.rs @@ -245,9 +245,6 @@ mod tests { "fetch-batch" => machine_dd::fetch_batch(&state, args).await, "finalize" => machine_dd::finalize(&mut state, args).await, "gc" => machine_dd::gc(&mut state, args).await, - "heartbeat-leased-reader" => { - machine_dd::heartbeat_leased_reader(&state, args).await - } "is-finalized" => machine_dd::is_finalized(&state, args), "listen-through" => { machine_dd::listen_through(&mut state, args).await diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index e21ebea377a49..6eea1e429b5bf 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -618,7 +618,7 @@ where pub async fn downgrade_since( &self, reader_id: &LeasedReaderId, - outstanding_seqno: Option, + outstanding_seqno: SeqNo, new_since: &Antichain, heartbeat_timestamp_ms: u64, ) -> (SeqNo, Since, RoutineMaintenance) { @@ -661,20 +661,6 @@ where } } - pub async fn heartbeat_leased_reader( - &self, - reader_id: &LeasedReaderId, - heartbeat_timestamp_ms: u64, - ) -> (SeqNo, bool, RoutineMaintenance) { - let metrics = Arc::clone(&self.applier.metrics); - let (seqno, existed, maintenance) = self - .apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| { - state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms) - }) - .await; - (seqno, existed, maintenance) - } - pub async fn expire_leased_reader( &self, reader_id: &LeasedReaderId, @@ -1533,7 +1519,9 @@ pub mod datadriven { args: DirectiveArgs<'_>, ) -> Result { let since = args.expect_antichain("since"); - let seqno = args.optional("seqno"); + let seqno = args + .optional("seqno") + .unwrap_or_else(|| datadriven.machine.seqno()); let reader_id = args.expect("reader_id"); let (_, since, routine) = datadriven .machine @@ -2236,18 +2224,6 @@ pub mod datadriven { )) } - pub async fn heartbeat_leased_reader( - datadriven: &MachineState, - args: DirectiveArgs<'_>, - ) -> Result { - let reader_id = args.expect("reader_id"); - let _ = datadriven - .machine - .heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)()) - .await; - Ok(format!("{} ok\n", datadriven.machine.seqno())) - } - pub async fn expire_critical_reader( datadriven: &mut MachineState, args: DirectiveArgs<'_>, @@ -2504,10 +2480,13 @@ pub mod tests { let client = new_test_client(&dyncfgs).await; // set a low rollup threshold so GC/truncation is more aggressive client.cfg.set_config(&ROLLUP_THRESHOLD, 5); - let (mut write, _) = client + let (mut write, read) = client .expect_open::(ShardId::new()) .await; + // Ensure the reader is not holding back the since. + read.expire().await; + // Write a bunch of batches. This should result in a bounded number of // live entries in consensus. const NUM_BATCHES: u64 = 100; diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index fc8f570b81c58..3a9500eae508d 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -421,7 +421,6 @@ impl MetricsVecs { )), compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"), downgrade_since: self.cmd_metrics("downgrade_since"), - heartbeat_reader: self.cmd_metrics("heartbeat_reader"), expire_reader: self.cmd_metrics("expire_reader"), expire_writer: self.cmd_metrics("expire_writer"), merge_res: self.cmd_metrics("merge_res"), @@ -630,7 +629,6 @@ pub struct CmdsMetrics { pub(crate) compare_and_append_noop: IntCounter, pub(crate) compare_and_downgrade_since: CmdMetrics, pub(crate) downgrade_since: CmdMetrics, - pub(crate) heartbeat_reader: CmdMetrics, pub(crate) expire_reader: CmdMetrics, pub(crate) expire_writer: CmdMetrics, pub(crate) merge_res: CmdMetrics, diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index 193982966d80a..1105ceec58e75 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -1877,7 +1877,7 @@ where &mut self, reader_id: &LeasedReaderId, seqno: SeqNo, - outstanding_seqno: Option, + outstanding_seqno: SeqNo, new_since: &Antichain, heartbeat_timestamp_ms: u64, ) -> ControlFlow>, Since> { @@ -1905,18 +1905,15 @@ where reader_state.last_heartbeat_timestamp_ms, ); - let seqno = match outstanding_seqno { - Some(outstanding_seqno) => { - assert!( - outstanding_seqno >= reader_state.seqno, - "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \ + let seqno = { + assert!( + outstanding_seqno >= reader_state.seqno, + "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \ is behind current reader_state ({:?})", - outstanding_seqno, - reader_state.seqno, - ); - std::cmp::min(outstanding_seqno, seqno) - } - None => seqno, + outstanding_seqno, + reader_state.seqno, + ); + std::cmp::min(outstanding_seqno, seqno) }; reader_state.seqno = seqno; @@ -1979,33 +1976,6 @@ where } } - pub fn heartbeat_leased_reader( - &mut self, - reader_id: &LeasedReaderId, - heartbeat_timestamp_ms: u64, - ) -> ControlFlow, bool> { - // We expire all readers if the upper and since both advance to the - // empty antichain. Gracefully handle this. At the same time, - // short-circuit the cmd application so we don't needlessly create new - // SeqNos. - if self.is_tombstone() { - return Break(NoOpStateTransition(false)); - } - - match self.leased_readers.get_mut(reader_id) { - Some(reader_state) => { - reader_state.last_heartbeat_timestamp_ms = std::cmp::max( - heartbeat_timestamp_ms, - reader_state.last_heartbeat_timestamp_ms, - ); - Continue(true) - } - // No-op, but we still commit the state change so that this gets - // linearized (maybe we're looking at old state). - None => Continue(false), - } - } - pub fn expire_leased_reader( &mut self, reader_id: &LeasedReaderId, @@ -3231,7 +3201,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(2), now() ), @@ -3243,7 +3213,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(2), now() ), @@ -3255,7 +3225,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(1), now() ), @@ -3280,7 +3250,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader2, seqno, - None, + seqno, &Antichain::from_elem(3), now() ), @@ -3292,7 +3262,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(5), now() ), @@ -3324,7 +3294,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader3, seqno, - None, + seqno, &Antichain::from_elem(10), now() ), @@ -3624,7 +3594,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, SeqNo::minimum(), - None, + SeqNo::minimum(), &Antichain::from_elem(2), now() ), diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index d7209924ce02c..d446a95c64f46 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -364,8 +364,7 @@ impl PersistClient { Arc::clone(&self.blob), reader_id, schemas, - reader_state.since, - heartbeat_ts, + reader_state, ) .await; @@ -2011,7 +2010,7 @@ mod tests { .take() .expect("handle should have unexpired state"); read.expire().await; - read_unexpired_state._heartbeat_tasks.await + read_unexpired_state.heartbeat_task.await } /// Verify that shard finalization works with empty shards, shards that have diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 0b7a90b9bfb9b..23a263343b7c0 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -10,24 +10,23 @@ //! Read capabilities and handles use async_stream::stream; -use std::backtrace::Backtrace; use std::collections::BTreeMap; use std::fmt::Debug; use std::future::Future; use std::sync::Arc; use std::time::{Duration, Instant}; +use differential_dataflow::Hashable; use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::difference::Monoid; use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::Description; use futures::Stream; use futures_util::{StreamExt, stream}; use mz_dyncfg::Config; +use mz_ore::cast::CastLossy; use mz_ore::halt; use mz_ore::instrument; -use mz_ore::now::EpochMillis; -use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt}; +use mz_ore::task::JoinHandle; use mz_persist::location::{Blob, SeqNo}; use mz_persist_types::columnar::{ColumnDecoder, Schema}; use mz_persist_types::{Codec, Codec64}; @@ -36,8 +35,7 @@ use serde::{Deserialize, Serialize}; use timely::PartialOrder; use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; -use tokio::runtime::Handle; -use tracing::{Instrument, debug_span, warn}; +use tracing::warn; use uuid::Uuid; use crate::batch::BLOB_TARGET_SIZE; @@ -45,10 +43,10 @@ use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters}; use crate::fetch::FetchConfig; use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part}; use crate::internal::encoding::Schemas; -use crate::internal::machine::{ExpireFn, Machine}; +use crate::internal::machine::Machine; use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics}; -use crate::internal::state::{BatchPart, HollowBatch}; -use crate::internal::watch::StateWatch; +use crate::internal::state::{HollowBatch, LeasedReaderState}; +use crate::internal::watch::{AwaitableState, StateWatch}; use crate::iter::{Consolidator, StructuredSort}; use crate::schema::SchemaCache; use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats}; @@ -201,12 +199,10 @@ where { /// Politely expires this subscribe, releasing its lease. /// - /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the + /// There is a best-effort impl in Drop to expire the /// [`ReadHandle`] held by the subscribe that wasn't explicitly expired /// with this method. When possible, explicit expiry is still preferred - /// because the Drop one is best effort and is dependant on a tokio - /// [Handle] being available in the TLC at the time of drop (which is a bit - /// subtle). Also, explicit expiry allows for control over when it happens. + /// because it also ensures that the background task is complete. pub async fn expire(mut self) { let _ = self.snapshot.take(); // Drop all leased parts. self.listen.expire().await; @@ -228,8 +224,6 @@ pub enum ListenEvent { #[derive(Debug)] pub struct Listen { handle: ReadHandle, - watch: StateWatch, - as_of: Antichain, since: Antichain, frontier: Antichain, @@ -258,11 +252,8 @@ where // (initially as_of although the frontier is inclusive and the as_of // isn't). Be a good citizen and downgrade early. handle.downgrade_since(&since).await; - - let watch = handle.machine.applier.watch(); Ok(Listen { handle, - watch, since, frontier: as_of.clone(), as_of, @@ -290,7 +281,7 @@ where let min_elapsed = self.handle.heartbeat_duration(); let next_batch = self.handle.machine.next_listen_batch( &self.frontier, - &mut self.watch, + &mut self.handle.watch, Some(&self.handle.reader_id), retry, ); @@ -490,17 +481,69 @@ where /// Politely expires this listen, releasing its lease. /// - /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the + /// There is a best-effort impl in to expire the /// [`ReadHandle`] held by the listen that wasn't explicitly expired with /// this method. When possible, explicit expiry is still preferred because - /// the Drop one is best effort and is dependant on a tokio [Handle] being - /// available in the TLC at the time of drop (which is a bit subtle). Also, - /// explicit expiry allows for control over when it happens. + /// it also ensures that the background task is complete. pub async fn expire(self) { self.handle.expire().await } } +/// The state for read holds shared between the heartbeat task and the main client, +/// including the since frontier and the seqno hold. +#[derive(Debug)] +pub(crate) struct ReadHolds { + /// The frontier we should hold the time-based lease back to. + held_since: Antichain, + /// The since hold we've actually committed to state. Should always be <= + /// the since hold. + applied_since: Antichain, + /// The largest seqno we've observed in the state. + recent_seqno: SeqNo, + /// The set of active leases. We hold back the seqno to the minimum lease or + /// the recent_seqno, whichever is earlier. + leases: BTreeMap, + /// True iff this state is expired. + expired: bool, + /// Used to trigger the background task to heartbeat state, instead of waiting for + /// the next tick. + request_sync: bool, +} + +impl ReadHolds +where + T: Timestamp + TotalOrder + Lattice + Codec64 + Sync, +{ + pub fn downgrade_since(&mut self, since: &Antichain) { + self.held_since.join_assign(since); + } + + pub fn observe_seqno(&mut self, seqno: SeqNo) { + self.recent_seqno = seqno.max(self.recent_seqno); + } + + pub fn lease_seqno(&mut self) -> Lease { + let seqno = self.recent_seqno; + let lease = self + .leases + .entry(seqno) + .or_insert_with(|| Lease::new(seqno)); + lease.clone() + } + + pub fn outstanding_seqno(&mut self) -> SeqNo { + while let Some(first) = self.leases.first_entry() { + if first.get().count() <= 1 { + first.remove(); + } else { + return *first.key(); + } + } + self.recent_seqno + } +} + /// A "capability" granting the ability to read the state of some shard at times /// greater or equal to `self.since()`. /// @@ -528,13 +571,14 @@ pub struct ReadHandle { pub(crate) machine: Machine, pub(crate) gc: GarbageCollector, pub(crate) blob: Arc, + watch: StateWatch, + pub(crate) reader_id: LeasedReaderId, pub(crate) read_schemas: Schemas, pub(crate) schema_cache: SchemaCache, since: Antichain, - pub(crate) last_heartbeat: EpochMillis, - pub(crate) leased_seqnos: BTreeMap, + pub(crate) hold_state: AwaitableState>, pub(crate) unexpired_state: Option, } @@ -553,6 +597,7 @@ where T: Timestamp + TotalOrder + Lattice + Codec64 + Sync, D: Monoid + Codec64 + Send + Sync, { + #[allow(clippy::unused_async)] pub(crate) async fn new( cfg: PersistConfig, metrics: Arc, @@ -561,28 +606,33 @@ where blob: Arc, reader_id: LeasedReaderId, read_schemas: Schemas, - since: Antichain, - last_heartbeat: EpochMillis, + state: LeasedReaderState, ) -> Self { let schema_cache = machine.applier.schema_cache(); - let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone()); + let hold_state = AwaitableState::new(ReadHolds { + held_since: state.since.clone(), + applied_since: state.since.clone(), + recent_seqno: state.seqno, + leases: Default::default(), + expired: false, + request_sync: false, + }); ReadHandle { cfg, metrics: Arc::clone(&metrics), machine: machine.clone(), gc: gc.clone(), blob, + watch: machine.applier.watch(), reader_id: reader_id.clone(), read_schemas, schema_cache, - since, - last_heartbeat, - leased_seqnos: BTreeMap::new(), + since: state.since, + hold_state: hold_state.clone(), unexpired_state: Some(UnexpiredReadHandleState { - expire_fn, - _heartbeat_tasks: JoinHandle::abort_on_drop(Self::start_reader_heartbeat_task( - machine, reader_id, gc, - )), + heartbeat_task: Self::start_reader_heartbeat_task( + machine, reader_id, gc, hold_state, + ), }), } } @@ -591,6 +641,7 @@ where machine: Machine, reader_id: LeasedReaderId, gc: GarbageCollector, + leased_seqnos: AwaitableState>, ) -> JoinHandle<()> { let metrics = Arc::clone(&machine.applier.metrics); let name = format!( @@ -600,7 +651,7 @@ where ); mz_ore::task::spawn(|| name, { metrics.tasks.heartbeat_read.instrument_task(async move { - Self::reader_heartbeat_task(machine, reader_id, gc).await + Self::reader_heartbeat_task(machine, reader_id, gc, leased_seqnos).await }) }) } @@ -609,11 +660,27 @@ where machine: Machine, reader_id: LeasedReaderId, gc: GarbageCollector, + leased_seqnos: AwaitableState>, ) { - let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2; + let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 4; + // Jitter the first tick to avoid a thundering herd when many readers are started around + // the same instant, like during deploys. + let jitter: f64 = f64::cast_lossy(reader_id.hashed()) / f64::cast_lossy(u64::MAX); + let mut interval = tokio::time::interval_at( + tokio::time::Instant::now() + sleep_duration.mul_f64(jitter), + sleep_duration, + ); + let mut held_since = leased_seqnos.read(|s| s.held_since.clone()); loop { let before_sleep = Instant::now(); - tokio::time::sleep(sleep_duration).await; + let _woke_by_tick = tokio::select! { + _tick = interval.tick() => { + true + } + _whatever = leased_seqnos.wait_while(|s| !s.request_sync) => { + false + } + }; let elapsed_since_before_sleeping = before_sleep.elapsed(); if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) { @@ -626,10 +693,37 @@ where } let before_heartbeat = Instant::now(); - let (_seqno, existed, maintenance) = machine - .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)()) - .await; - maintenance.start_performing(&machine, &gc); + let heartbeat_ms = (machine.applier.cfg.now)(); + let current_seqno = machine.seqno(); + let result = leased_seqnos.modify(|s| { + if s.expired { + Err(()) + } else { + s.observe_seqno(current_seqno); + s.request_sync = false; + held_since.join_assign(&s.held_since); + Ok(s.outstanding_seqno()) + } + }); + let actual_since = match result { + Ok(held_seqno) => { + let (seqno, actual_since, maintenance) = machine + .downgrade_since(&reader_id, held_seqno, &held_since, heartbeat_ms) + .await; + leased_seqnos.modify(|s| { + s.applied_since.clone_from(&actual_since.0); + s.observe_seqno(seqno) + }); + maintenance.start_performing(&machine, &gc); + actual_since + } + Err(()) => { + let (seqno, maintenance) = machine.expire_leased_reader(&reader_id).await; + leased_seqnos.modify(|s| s.observe_seqno(seqno)); + maintenance.start_performing(&machine, &gc); + break; + } + }; let elapsed_since_heartbeat = before_heartbeat.elapsed(); if elapsed_since_heartbeat > Duration::from_secs(60) { @@ -641,7 +735,7 @@ where ); } - if !existed { + if PartialOrder::less_than(&held_since, &actual_since.0) { // If the read handle was intentionally expired, this task // *should* be aborted before it observes the expiration. So if // we get here, this task somehow failed to keep the read lease @@ -672,15 +766,13 @@ where &self.since } - fn outstanding_seqno(&mut self) -> Option { - while let Some(first) = self.leased_seqnos.first_entry() { - if first.get().count() <= 1 { - first.remove(); - } else { - return Some(*first.key()); - } - } - None + #[cfg(test)] + fn outstanding_seqno(&self) -> SeqNo { + let current_seqno = self.machine.seqno(); + self.hold_state.modify(|s| { + s.observe_seqno(current_seqno); + s.outstanding_seqno() + }) } /// Forwards the since frontier of this handle, giving up the ability to @@ -689,47 +781,16 @@ where /// This may trigger (asynchronous) compaction and consolidation in the /// system. A `new_since` of the empty antichain "finishes" this shard, /// promising that no more data will ever be read by this handle. - /// - /// This also acts as a heartbeat for the reader lease (including if called - /// with `new_since` equal to something like `self.since()` or the minimum - /// timestamp, making the call a no-op). #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))] pub async fn downgrade_since(&mut self, new_since: &Antichain) { - // Guaranteed to be the smallest/oldest outstanding lease on a `SeqNo`. - let outstanding_seqno = self.outstanding_seqno(); - - let heartbeat_ts = (self.cfg.now)(); - let (_seqno, current_reader_since, maintenance) = self - .machine - .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts) + self.since = new_since.clone(); + self.hold_state.modify(|s| { + s.downgrade_since(new_since); + s.request_sync = true; + }); + self.hold_state + .wait_while(|s| PartialOrder::less_than(&s.applied_since, new_since)) .await; - - // Debugging for database-issues#4590. - if let Some(outstanding_seqno) = outstanding_seqno { - let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0); - // We get just over 1 seqno-per-second on average for a shard in - // prod, so this is about an hour. - const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60; - if seqnos_held >= SEQNOS_HELD_THRESHOLD { - tracing::info!( - "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}", - self.machine.shard_id(), - self.reader_id, - outstanding_seqno, - _seqno, - self.leased_seqnos.keys().take(10).collect::>(), - // The Debug impl of backtrace is less aesthetic, but will put the trace - // on a single line and play more nicely with our Honeycomb quota - Backtrace::force_capture(), - ); - } - } - - self.since = current_reader_since.0; - // A heartbeat is just any downgrade_since traffic, so update the - // internal rate limiter here to play nicely with `maybe_heartbeat`. - self.last_heartbeat = heartbeat_ts; - maintenance.start_performing(&self.machine, &self.gc); } /// Returns an ongoing subscription of updates to a shard. @@ -818,23 +879,6 @@ where Ok(Subscribe::new(snapshot_parts, listen)) } - fn lease_batch_part( - &mut self, - desc: Description, - part: BatchPart, - filter: FetchBatchFilter, - ) -> LeasedBatchPart { - LeasedBatchPart { - metrics: Arc::clone(&self.metrics), - shard_id: self.machine.shard_id(), - filter, - desc, - part, - lease: self.lease_seqno(), - filter_pushdown_audit: false, - } - } - fn lease_batch_parts( &mut self, batch: HollowBatch, @@ -844,8 +888,17 @@ where let blob = Arc::clone(&self.blob); let metrics = Arc::clone(&self.metrics); let desc = batch.desc.clone(); + let lease = self.lease_seqno().await; for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) { - yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone()) + yield LeasedBatchPart { + metrics: Arc::clone(&self.metrics), + shard_id: self.machine.shard_id(), + filter: filter.clone(), + desc: desc.clone(), + part: part.expect("leased part").into_owned(), + lease: lease.clone(), + filter_pushdown_audit: false, + } } } } @@ -853,13 +906,18 @@ where /// Tracks that the `ReadHandle`'s machine's current `SeqNo` is being /// "leased out" to a `LeasedBatchPart`, and cannot be garbage /// collected until its lease has been returned. - fn lease_seqno(&mut self) -> Lease { - let seqno = self.machine.seqno(); - let lease = self - .leased_seqnos - .entry(seqno) - .or_insert_with(|| Lease::new(seqno)); - lease.clone() + async fn lease_seqno(&mut self) -> Lease { + let current_seqno = self.machine.seqno(); + let lease = self.hold_state.modify(|s| { + s.observe_seqno(current_seqno); + s.lease_seqno() + }); + // The seqno we've leased may be the seqno observed by our heartbeat task, which could be + // ahead of the last state we saw. Ensure we only observe states in the future of our hold. + // (Since these are backed by the same state in the same process, this should all be pretty + // fast.) + self.watch.wait_for_seqno_ge(lease.seqno()).await; + lease } /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the @@ -892,8 +950,7 @@ where Arc::clone(&self.blob), new_reader_id, self.read_schemas.clone(), - reader_state.since, - heartbeat_ts, + reader_state, ) .await; new_reader @@ -905,49 +962,31 @@ where /// A rate-limited version of [Self::downgrade_since]. /// - /// This is an internally rate limited helper, designed to allow users to - /// call it as frequently as they like. Call this or [Self::downgrade_since], - /// on some interval that is "frequent" compared to the read lease duration. + /// Users can call this as frequently as they wish; changes will be periodically + /// flushed to state by the heartbeat task. + #[allow(clippy::unused_async)] pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain) { - let min_elapsed = self.heartbeat_duration(); - let elapsed_since_last_heartbeat = - Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat)); - if elapsed_since_last_heartbeat >= min_elapsed { - self.downgrade_since(new_since).await; - } + self.since = new_since.clone(); + self.hold_state.modify(|s| { + s.downgrade_since(new_since); + }); } /// Politely expires this reader, releasing its lease. /// /// There is a best-effort impl in Drop to expire a reader that wasn't /// explictly expired with this method. When possible, explicit expiry is - /// still preferred because the Drop one is best effort and is dependant on - /// a tokio [Handle] being available in the TLC at the time of drop (which - /// is a bit subtle). Also, explicit expiry allows for control over when it - /// happens. + /// still preferred because it also ensures that the background task is complete. #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))] pub async fn expire(mut self) { - // We drop the unexpired state before expiring the reader to ensure the - // heartbeat tasks can never observe the expired state. This doesn't - // matter for correctness, but avoids confusing log output if the - // heartbeat task were to discover that its lease has been expired. + self.hold_state.modify(|s| { + s.expired = true; + s.request_sync = true; + }); let Some(unexpired_state) = self.unexpired_state.take() else { return; }; - unexpired_state.expire_fn.0().await; - } - - fn expire_fn( - machine: Machine, - gc: GarbageCollector, - reader_id: LeasedReaderId, - ) -> ExpireFn { - ExpireFn(Box::new(move || { - Box::pin(async move { - let (_, maintenance) = machine.expire_leased_reader(&reader_id).await; - maintenance.start_performing(&machine, &gc); - }) - })) + unexpired_state.heartbeat_task.await; } /// Test helper for a [Self::listen] call that is expected to succeed. @@ -963,8 +1002,7 @@ where /// State for a read handle that has not been explicitly expired. #[derive(Debug)] pub(crate) struct UnexpiredReadHandleState { - expire_fn: ExpireFn, - pub(crate) _heartbeat_tasks: AbortOnDropHandle<()>, + pub(crate) heartbeat_task: JoinHandle<()>, } /// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor]. @@ -1092,7 +1130,7 @@ where should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool, ) -> Result, Since> { let batches = self.machine.snapshot(&as_of).await?; - let lease = self.lease_seqno(); + let lease = self.lease_seqno().await; Self::read_batches_consolidated( &self.cfg, @@ -1305,34 +1343,10 @@ where impl Drop for ReadHandle { fn drop(&mut self) { - // We drop the unexpired state before expiring the reader to ensure the - // heartbeat tasks can never observe the expired state. This doesn't - // matter for correctness, but avoids confusing log output if the - // heartbeat task were to discover that its lease has been expired. - let Some(unexpired_state) = self.unexpired_state.take() else { - return; - }; - - let handle = match Handle::try_current() { - Ok(x) => x, - Err(_) => { - warn!( - "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout", - self.reader_id - ); - return; - } - }; - // Spawn a best-effort task to expire this read handle. It's fine if - // this doesn't run to completion, we'd just have to wait out the lease - // before the shard-global since is unblocked. - // - // Intentionally create the span outside the task to set the parent. - let expire_span = debug_span!("drop::expire"); - handle.spawn_named( - || format!("ReadHandle::expire ({})", self.reader_id), - unexpired_state.expire_fn.0().instrument(expire_span), - ); + self.hold_state.modify(|s| { + s.expired = true; + s.request_sync = true; + }); } } @@ -1523,8 +1537,16 @@ mod tests { .await .expect("cannot serve requested as_of"); - // Determine sequence number at outset. - let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since(); + // Determine the sequence number held by our subscribe. + let original_seqno_since = subscribe.listen.handle.outstanding_seqno(); + if let Some(snapshot) = &subscribe.snapshot { + for part in snapshot { + assert!( + part.lease.seqno() >= original_seqno_since, + "our seqno hold must cover all parts" + ); + } + } let mut parts = vec![]; @@ -1614,7 +1636,7 @@ mod tests { // We should expect the SeqNo to be downgraded if this part's SeqNo // is no longer leased to any other parts, either. - let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno); + let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > part_seqno; let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since(); if expect_downgrade { diff --git a/src/storage/src/source/reclock.rs b/src/storage/src/source/reclock.rs index f84e27544d4aa..bf0d744d815b8 100644 --- a/src/storage/src/source/reclock.rs +++ b/src/storage/src/source/reclock.rs @@ -544,7 +544,12 @@ mod tests { // Regression test for // https://github.com/MaterializeInc/database-issues/issues/4216. #[mz_ore::test(tokio::test(start_paused = true))] - #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux` + #[cfg_attr(miri, ignore)] + // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux` + // This test depends on the exact details of when the Persist client flushes frontier progress + // to state, which is not public API and should not be essential for correctness. (Today, the + // as_of is held back by the spawned task in the async storage worker.) + #[ignore] async fn test_since_hold() { let binding_shard = ShardId::new();