Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 100 additions & 2 deletions src/adapter/src/coord/catalog_implications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ use mz_ore::str::StrExt;
use mz_ore::task;
use mz_repr::{CatalogItemId, GlobalId, RelationVersion, RelationVersionSelector, Timestamp};
use mz_sql::plan::ConnectionDetails;
use mz_storage_client::controller::{CollectionDescription, DataSource};
use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
use mz_storage_types::connections::PostgresConnection;
use mz_storage_types::connections::inline::{InlinedConnection, IntoInlineConnection};
use mz_storage_types::sinks::StorageSinkConnection;
use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
use mz_storage_types::sources::{GenericSourceConnection, SourceExport, SourceExportDataConfig};
use tracing::{Instrument, info_span, warn};

Expand Down Expand Up @@ -188,6 +189,7 @@ impl Coordinator {
let mut storage_policies_to_initialize = BTreeMap::new();
let mut execution_timestamps_to_set = BTreeSet::new();
let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
let mut sinks_to_create: Vec<(CatalogItemId, Sink)> = vec![];

// Replacing a materialized view causes the replacement's catalog entry
// to be dropped. Its compute and storage collections are transferred to
Expand Down Expand Up @@ -301,7 +303,7 @@ impl Coordinator {
}
}
CatalogImplication::Sink(CatalogImplicationKind::Added(sink)) => {
tracing::debug!(?sink, "not handling AddSink in here yet");
sinks_to_create.push((catalog_id, sink));
}
CatalogImplication::Sink(CatalogImplicationKind::Altered {
prev: prev_sink,
Expand Down Expand Up @@ -559,6 +561,14 @@ impl Coordinator {
self.create_table_collections(table_collections_to_create, execution_timestamps_to_set)
.await?;
}

// Create sinks (storage exports) - must happen before initializing read
// policies so sink global_ids are included in storage_policies_to_initialize.
for (_item_id, sink) in sinks_to_create {
self.handle_create_sink(&mut storage_policies_to_initialize, sink)
.await?;
}

// It is _very_ important that we only initialize read policies after we
// have created all the sources/collections. Some of the sources created
// in this collection might have dependencies on other sources, so the
Expand Down Expand Up @@ -1446,6 +1456,94 @@ impl Coordinator {
}
}
}

#[instrument(level = "debug")]
async fn handle_create_sink(
&mut self,
storage_policies_to_initialize: &mut BTreeMap<CompactionWindow, BTreeSet<GlobalId>>,
sink: Sink,
) -> Result<(), AdapterError> {
let global_id = sink.global_id();

// Validate `sink.from` is in fact a storage collection
self.controller.storage.check_exists(sink.from)?;

// The AsOf is used to determine at what time to snapshot reading from
// the persist collection. This is primarily relevant when we do _not_
// want to include the snapshot in the sink.
//
// We choose the smallest as_of that is legal, according to the sinked
// collection's since.
let id_bundle = crate::CollectionIdBundle {
storage_ids: BTreeSet::from([sink.from]),
compute_ids: BTreeMap::new(),
};

// We're putting in place read holds, such that create_collections, below,
// which calls update_read_capabilities, can successfully do so.
// Otherwise, the since of dependencies might move along concurrently,
// pulling the rug from under us!

// TODO: Maybe in the future, pass those holds on to storage, to hold on
// to them and downgrade when possible?
let read_holds = self.acquire_read_holds(&id_bundle);
let as_of = read_holds.least_valid_read();

let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
from: sink.from,
from_desc: storage_sink_from_entry
.relation_desc()
.expect("sinks can only be built on items with descs")
.into_owned(),
connection: sink
.connection
.clone()
.into_inline_connection(self.catalog().state()),
envelope: sink.envelope,
as_of,
with_snapshot: sink.with_snapshot,
version: sink.version,
from_storage_metadata: (),
to_storage_metadata: (),
commit_interval: sink.commit_interval,
};

let collection_desc = CollectionDescription {
// TODO(sinks): make generic once we have more than one sink type.
desc: KAFKA_PROGRESS_DESC.clone(),
data_source: DataSource::Sink {
desc: ExportDescription {
sink: storage_sink_desc,
instance_id: sink.cluster_id,
},
},
since: None,
status_collection_id: None,
timeline: None,
primary: None,
};

// Create the collection.
let storage_metadata = self.catalog.state().storage_metadata();
self.controller
.storage
.create_collections(storage_metadata, None, vec![(global_id, collection_desc)])
.await
.unwrap_or_terminate("cannot fail to create exports");

// Drop read holds after the export has been created, at which point
// storage will have put in its own read holds.
drop(read_holds);

// Initialize read policies for the sink
storage_policies_to_initialize
.entry(CompactionWindow::Default)
.or_default()
.insert(global_id);

Ok(())
}
}

/// A state machine for building catalog implications from catalog updates.
Expand Down
86 changes: 1 addition & 85 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use fail::fail_point;
use maplit::{btreemap, btreeset};
use mz_adapter_types::compaction::SINCE_GRANULARITY;
use mz_adapter_types::connection::ConnectionId;
use mz_audit_log::VersionedEvent;
use mz_catalog::SYSTEM_CONN_ID;
use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Sink};
use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc};
use mz_cluster_client::ReplicaId;
use mz_controller::clusters::ReplicaLocation;
use mz_controller_types::ClusterId;
Expand All @@ -42,10 +41,7 @@ use mz_sql::session::vars::{
MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
MAX_SOURCES, MAX_SQL_SERVER_CONNECTIONS, MAX_TABLES, SystemVars, Var,
};
use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::read_policy::ReadPolicy;
use mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC;
use serde_json::json;
use tracing::{Instrument, Level, event, info_span, warn};

Expand Down Expand Up @@ -941,86 +937,6 @@ impl Coordinator {
.set_limit(webhook_request_limit);
}

pub(crate) async fn create_storage_export(
&mut self,
id: GlobalId,
sink: &Sink,
) -> Result<(), AdapterError> {
// Validate `sink.from` is in fact a storage collection
self.controller.storage.check_exists(sink.from)?;

// The AsOf is used to determine at what time to snapshot reading from
// the persist collection. This is primarily relevant when we do _not_
// want to include the snapshot in the sink.
//
// We choose the smallest as_of that is legal, according to the sinked
// collection's since.
let id_bundle = crate::CollectionIdBundle {
storage_ids: btreeset! {sink.from},
compute_ids: btreemap! {},
};

// We're putting in place read holds, such that create_exports, below,
// which calls update_read_capabilities, can successfully do so.
// Otherwise, the since of dependencies might move along concurrently,
// pulling the rug from under us!
//
// TODO: Maybe in the future, pass those holds on to storage, to hold on
// to them and downgrade when possible?
let read_holds = self.acquire_read_holds(&id_bundle);
let as_of = read_holds.least_valid_read();

let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
from: sink.from,
from_desc: storage_sink_from_entry
.relation_desc()
.expect("sinks can only be built on items with descs")
.into_owned(),
connection: sink
.connection
.clone()
.into_inline_connection(self.catalog().state()),
envelope: sink.envelope,
as_of,
with_snapshot: sink.with_snapshot,
version: sink.version,
from_storage_metadata: (),
to_storage_metadata: (),
commit_interval: sink.commit_interval,
};

let collection_desc = CollectionDescription {
// TODO(sinks): make generic once we have more than one sink type.
desc: KAFKA_PROGRESS_DESC.clone(),
data_source: DataSource::Sink {
desc: ExportDescription {
sink: storage_sink_desc,
instance_id: sink.cluster_id,
},
},
since: None,
status_collection_id: None,
timeline: None,
primary: None,
};
let collections = vec![(id, collection_desc)];

// Create the collections.
let storage_metadata = self.catalog.state().storage_metadata();
let res = self
.controller
.storage
.create_collections(storage_metadata, None, collections)
.await;

// Drop read holds after the export has been created, at which point
// storage will have put in its own read holds.
drop(read_holds);

Ok(res?)
}

/// Validate all resource limits in a catalog transaction and return an error if that limit is
/// exceeded.
fn validate_resource_limits(
Expand Down
45 changes: 18 additions & 27 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use anyhow::anyhow;
use futures::future::{BoxFuture, FutureExt};
use futures::{Future, StreamExt, future};
use itertools::Itertools;
use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
use mz_catalog::memory::objects::{
Expand Down Expand Up @@ -1093,34 +1092,26 @@ impl Coordinator {

let result = self.catalog_transact(Some(ctx.session()), ops).await;

match result {
Ok(()) => {}
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
})) if if_not_exists => {
ctx.session()
.add_notice(AdapterNotice::ObjectAlreadyExists {
name: name.item,
ty: "sink",
});
ctx.retire(Ok(ExecuteResponse::CreatedSink));
return;
}
Err(e) => {
ctx.retire(Err(e));
return;
// Make sure we can't early-return and always retire the context.
let infallible = |result| -> Result<_, _> {
match result {
Ok(()) => Ok(ExecuteResponse::CreatedSink),
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind:
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
})) if if_not_exists => {
ctx.session()
.add_notice(AdapterNotice::ObjectAlreadyExists {
name: name.item,
ty: "sink",
});
Ok(ExecuteResponse::CreatedSink)
}
Err(e) => Err(e),
}
};

self.create_storage_export(global_id, &catalog_sink)
.await
.unwrap_or_terminate("cannot fail to create exports");

self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
.await;

ctx.retire(Ok(ExecuteResponse::CreatedSink))
let result = infallible(result);
ctx.retire(result);
}

/// Validates that a view definition does not contain any expressions that may lead to
Expand Down