From 62077b076aa5db69333c493af694a6494aa4476d Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 28 Nov 2025 13:23:26 +0100 Subject: [PATCH 1/2] adapter: handle AddSink (storage exports) via catalog implications --- src/adapter/src/coord/catalog_implications.rs | 102 +++++++++++++++++- src/adapter/src/coord/ddl.rs | 86 +-------------- src/adapter/src/coord/sequencer/inner.rs | 18 +--- 3 files changed, 105 insertions(+), 101 deletions(-) diff --git a/src/adapter/src/coord/catalog_implications.rs b/src/adapter/src/coord/catalog_implications.rs index 5a7c4bc305f50..90fd915e1fb1b 100644 --- a/src/adapter/src/coord/catalog_implications.rs +++ b/src/adapter/src/coord/catalog_implications.rs @@ -50,10 +50,11 @@ use mz_ore::str::StrExt; use mz_ore::task; use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector, Timestamp}; use mz_sql::plan::ConnectionDetails; -use mz_storage_client::controller::{CollectionDescription, DataSource}; +use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription}; use mz_storage_types::connections::PostgresConnection; use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection}; use mz_storage_types::sinks::StorageSinkConnection; +use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC; use mz_storage_types::sources::{GenericSourceConnection, SourceExport, SourceExportDataConfig}; use tracing::{Instrument, info_span, warn}; @@ -188,6 +189,7 @@ impl Coordinator { let mut storage_policies_to_initialize = BTreeMap::new(); let mut execution_timestamps_to_set = BTreeSet::new(); let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![]; + let mut sinks_to_create: Vec<(CatalogItemId, Sink)> = vec![]; // Replacing a materialized view causes the replacement's catalog entry // to be dropped. Its compute and storage collections are transferred to @@ -301,7 +303,7 @@ impl Coordinator { } } CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => { - tracing::debug!(?sink, "not handling AddSink in here yet"); + sinks_to_create.push((catalog_id, sink)); } CatalogImplication::Sink(CatalogImplicationKind::Altered { prev: prev_sink, @@ -559,6 +561,14 @@ impl Coordinator { self.create_table_collections(table_collections_to_create, execution_timestamps_to_set) .await?; } + + // Create sinks (storage exports) - must happen before initializing read + // policies so sink global_ids are included in storage_policies_to_initialize. + for (_item_id, sink) in sinks_to_create { + self.handle_create_sink(&mut storage_policies_to_initialize, sink) + .await?; + } + // It is _very_ important that we only initialize read policies after we // have created all the sources/collections. Some of the sources created // in this collection might have dependencies on other sources, so the @@ -1446,6 +1456,94 @@ impl Coordinator { } } } + + #[instrument(level = "debug")] + async fn handle_create_sink( + &mut self, + storage_policies_to_initialize: &mut BTreeMap>, + sink: Sink, + ) -> Result<(), AdapterError> { + let global_id = sink.global_id(); + + // Validate `sink.from` is in fact a storage collection + self.controller.storage.check_exists(sink.from)?; + + // The AsOf is used to determine at what time to snapshot reading from + // the persist collection. This is primarily relevant when we do _not_ + // want to include the snapshot in the sink. + // + // We choose the smallest as_of that is legal, according to the sinked + // collection's since. + let id_bundle = crate::CollectionIdBundle { + storage_ids: BTreeSet::from([sink.from]), + compute_ids: BTreeMap::new(), + }; + + // We're putting in place read holds, such that create_collections, below, + // which calls update_read_capabilities, can successfully do so. + // Otherwise, the since of dependencies might move along concurrently, + // pulling the rug from under us! + + // TODO: Maybe in the future, pass those holds on to storage, to hold on + // to them and downgrade when possible? + let read_holds = self.acquire_read_holds(&id_bundle); + let as_of = read_holds.least_valid_read(); + + let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from); + let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc { + from: sink.from, + from_desc: storage_sink_from_entry + .relation_desc() + .expect("sinks can only be built on items with descs") + .into_owned(), + connection: sink + .connection + .clone() + .into_inline_connection(self.catalog().state()), + envelope: sink.envelope, + as_of, + with_snapshot: sink.with_snapshot, + version: sink.version, + from_storage_metadata: (), + to_storage_metadata: (), + commit_interval: sink.commit_interval, + }; + + let collection_desc = CollectionDescription { + // TODO(sinks): make generic once we have more than one sink type. + desc: KAFKA_PROGRESS_DESC.clone(), + data_source: DataSource::Sink { + desc: ExportDescription { + sink: storage_sink_desc, + instance_id: sink.cluster_id, + }, + }, + since: None, + status_collection_id: None, + timeline: None, + primary: None, + }; + + // Create the collection. + let storage_metadata = self.catalog.state().storage_metadata(); + self.controller + .storage + .create_collections(storage_metadata, None, vec![(global_id, collection_desc)]) + .await + .unwrap_or_terminate("cannot fail to create exports"); + + // Drop read holds after the export has been created, at which point + // storage will have put in its own read holds. + drop(read_holds); + + // Initialize read policies for the sink + storage_policies_to_initialize + .entry(CompactionWindow::Default) + .or_default() + .insert(global_id); + + Ok(()) + } } /// A state machine for building catalog implications from catalog updates. diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 924fab9174898..7ba88f277c887 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -16,12 +16,11 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use fail::fail_point; -use maplit::{btreemap, btreeset}; use mz_adapter_types::compaction::SINCE_GRANULARITY; use mz_adapter_types::connection::ConnectionId; use mz_audit_log::VersionedEvent; use mz_catalog::SYSTEM_CONN_ID; -use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink}; +use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc}; use mz_cluster_client::ReplicaId; use mz_controller::clusters::ReplicaLocation; use mz_controller_types::ClusterId; @@ -42,10 +41,7 @@ use mz_sql::session::vars::{ MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS, MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var, }; -use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription}; -use mz_storage_types::connections::inline::IntoInlineConnection; use mz_storage_types::read_policy::ReadPolicy; -use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC; use serde_json::json; use tracing::{Instrument, Level, event, info_span, warn}; @@ -941,86 +937,6 @@ impl Coordinator { .set_limit(webhook_request_limit); } - pub(crate) async fn create_storage_export( - &mut self, - id: GlobalId, - sink: &Sink, - ) -> Result<(), AdapterError> { - // Validate `sink.from` is in fact a storage collection - self.controller.storage.check_exists(sink.from)?; - - // The AsOf is used to determine at what time to snapshot reading from - // the persist collection. This is primarily relevant when we do _not_ - // want to include the snapshot in the sink. - // - // We choose the smallest as_of that is legal, according to the sinked - // collection's since. - let id_bundle = crate::CollectionIdBundle { - storage_ids: btreeset! {sink.from}, - compute_ids: btreemap! {}, - }; - - // We're putting in place read holds, such that create_exports, below, - // which calls update_read_capabilities, can successfully do so. - // Otherwise, the since of dependencies might move along concurrently, - // pulling the rug from under us! - // - // TODO: Maybe in the future, pass those holds on to storage, to hold on - // to them and downgrade when possible? - let read_holds = self.acquire_read_holds(&id_bundle); - let as_of = read_holds.least_valid_read(); - - let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from); - let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc { - from: sink.from, - from_desc: storage_sink_from_entry - .relation_desc() - .expect("sinks can only be built on items with descs") - .into_owned(), - connection: sink - .connection - .clone() - .into_inline_connection(self.catalog().state()), - envelope: sink.envelope, - as_of, - with_snapshot: sink.with_snapshot, - version: sink.version, - from_storage_metadata: (), - to_storage_metadata: (), - commit_interval: sink.commit_interval, - }; - - let collection_desc = CollectionDescription { - // TODO(sinks): make generic once we have more than one sink type. - desc: KAFKA_PROGRESS_DESC.clone(), - data_source: DataSource::Sink { - desc: ExportDescription { - sink: storage_sink_desc, - instance_id: sink.cluster_id, - }, - }, - since: None, - status_collection_id: None, - timeline: None, - primary: None, - }; - let collections = vec![(id, collection_desc)]; - - // Create the collections. - let storage_metadata = self.catalog.state().storage_metadata(); - let res = self - .controller - .storage - .create_collections(storage_metadata, None, collections) - .await; - - // Drop read holds after the export has been created, at which point - // storage will have put in its own read holds. - drop(read_holds); - - Ok(res?) - } - /// Validate all resource limits in a catalog transaction and return an error if that limit is /// exceeded. fn validate_resource_limits( diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index fc9ef91ce35fc..8ee1ab6253d2c 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -18,7 +18,6 @@ use anyhow::anyhow; use futures::future::{BoxFuture, FutureExt}; use futures::{Future, StreamExt, future}; use itertools::Itertools; -use mz_adapter_types::compaction::CompactionWindow; use mz_adapter_types::connection::ConnectionId; use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH}; use mz_catalog::memory::objects::{ @@ -1094,7 +1093,9 @@ impl Coordinator { let result = self.catalog_transact(Some(ctx.session()), ops).await; match result { - Ok(()) => {} + Ok(()) => { + ctx.retire(Ok(ExecuteResponse::CreatedSink)); + } Err(AdapterError::Catalog(mz_catalog::memory::error::Error { kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)), @@ -1105,22 +1106,11 @@ impl Coordinator { ty: "sink", }); ctx.retire(Ok(ExecuteResponse::CreatedSink)); - return; } Err(e) => { ctx.retire(Err(e)); - return; } - }; - - self.create_storage_export(global_id, &catalog_sink) - .await - .unwrap_or_terminate("cannot fail to create exports"); - - self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default) - .await; - - ctx.retire(Ok(ExecuteResponse::CreatedSink)) + } } /// Validates that a view definition does not contain any expressions that may lead to From 8e60b806a6514856b463e1c93828bf32075d55e4 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 15 Dec 2025 14:01:36 +0100 Subject: [PATCH 2/2] adapter: in sequencer(sink), factor out retire, make sure we always call it --- src/adapter/src/coord/sequencer/inner.rs | 37 ++++++++++++------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 8ee1ab6253d2c..e3528fe7b5e7d 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -1092,25 +1092,26 @@ impl Coordinator { let result = self.catalog_transact(Some(ctx.session()), ops).await; - match result { - Ok(()) => { - ctx.retire(Ok(ExecuteResponse::CreatedSink)); - } - Err(AdapterError::Catalog(mz_catalog::memory::error::Error { - kind: - mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)), - })) if if_not_exists => { - ctx.session() - .add_notice(AdapterNotice::ObjectAlreadyExists { - name: name.item, - ty: "sink", - }); - ctx.retire(Ok(ExecuteResponse::CreatedSink)); - } - Err(e) => { - ctx.retire(Err(e)); + // Make sure we can't early-return and always retire the context. + let infallible = |result| -> Result<_, _> { + match result { + Ok(()) => Ok(ExecuteResponse::CreatedSink), + Err(AdapterError::Catalog(mz_catalog::memory::error::Error { + kind: + mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)), + })) if if_not_exists => { + ctx.session() + .add_notice(AdapterNotice::ObjectAlreadyExists { + name: name.item, + ty: "sink", + }); + Ok(ExecuteResponse::CreatedSink) + } + Err(e) => Err(e), } - } + }; + let result = infallible(result); + ctx.retire(result); } /// Validates that a view definition does not contain any expressions that may lead to