@@ -50,10 +50,11 @@ use mz_ore::str::StrExt;
5050use mz_ore:: task;
5151use mz_repr:: { CatalogItemId , GlobalId , RelationVersion , RelationVersionSelector , Timestamp } ;
5252use mz_sql:: plan:: ConnectionDetails ;
53- use mz_storage_client:: controller:: { CollectionDescription , DataSource } ;
53+ use mz_storage_client:: controller:: { CollectionDescription , DataSource , ExportDescription } ;
5454use mz_storage_types:: connections:: PostgresConnection ;
5555use mz_storage_types:: connections:: inline:: { InlinedConnection , IntoInlineConnection } ;
5656use mz_storage_types:: sinks:: StorageSinkConnection ;
57+ use mz_storage_types:: sources:: kafka:: KAFKA_PROGRESS_DESC ;
5758use mz_storage_types:: sources:: { GenericSourceConnection , SourceExport , SourceExportDataConfig } ;
5859use tracing:: { Instrument , info_span, warn} ;
5960
@@ -188,6 +189,7 @@ impl Coordinator {
188189 let mut storage_policies_to_initialize = BTreeMap :: new ( ) ;
189190 let mut execution_timestamps_to_set = BTreeSet :: new ( ) ;
190191 let mut vpc_endpoints_to_create: Vec < ( CatalogItemId , VpcEndpointConfig ) > = vec ! [ ] ;
192+ let mut sinks_to_create: Vec < ( CatalogItemId , Sink ) > = vec ! [ ] ;
191193
192194 // Replacing a materialized view causes the replacement's catalog entry
193195 // to be dropped. Its compute and storage collections are transferred to
@@ -301,13 +303,14 @@ impl Coordinator {
301303 }
302304 }
303305 CatalogImplication :: Sink ( CatalogImplicationKind :: Added ( sink) ) => {
304- tracing :: debug! ( ?sink , "not handling AddSink in here yet" ) ;
306+ sinks_to_create . push ( ( catalog_id , sink ) ) ;
305307 }
306308 CatalogImplication :: Sink ( CatalogImplicationKind :: Altered {
307- prev : prev_sink ,
308- new : new_sink ,
309+ prev : _prev_sink ,
310+ new : _new_sink ,
309311 } ) => {
310- tracing:: debug!( ?prev_sink, ?new_sink, "not handling AlterSink in here yet" ) ;
312+ // Sink alterations are not yet supported - they would require
313+ // re-creating the storage export with new parameters.
311314 }
312315 CatalogImplication :: Sink ( CatalogImplicationKind :: Dropped ( sink, full_name) ) => {
313316 storage_sink_gids_to_drop. push ( sink. global_id ( ) ) ;
@@ -559,6 +562,14 @@ impl Coordinator {
559562 self . create_table_collections ( table_collections_to_create, execution_timestamps_to_set)
560563 . await ?;
561564 }
565+
566+ // Create sinks (storage exports) - must happen before initializing read
567+ // policies so sink global_ids are included in storage_policies_to_initialize.
568+ for ( item_id, sink) in sinks_to_create {
569+ self . handle_create_sink ( & mut storage_policies_to_initialize, item_id, sink)
570+ . await ?;
571+ }
572+
562573 // It is _very_ important that we only initialize read policies after we
563574 // have created all the sources/collections. Some of the sources created
564575 // in this collection might have dependencies on other sources, so the
@@ -1446,6 +1457,95 @@ impl Coordinator {
14461457 }
14471458 }
14481459 }
1460+
1461+ #[ instrument( level = "debug" ) ]
1462+ async fn handle_create_sink (
1463+ & mut self ,
1464+ storage_policies_to_initialize : & mut BTreeMap < CompactionWindow , BTreeSet < GlobalId > > ,
1465+ _item_id : CatalogItemId ,
1466+ sink : Sink ,
1467+ ) -> Result < ( ) , AdapterError > {
1468+ let global_id = sink. global_id ( ) ;
1469+
1470+ // Validate `sink.from` is in fact a storage collection
1471+ self . controller . storage . check_exists ( sink. from ) ?;
1472+
1473+ // The AsOf is used to determine at what time to snapshot reading from
1474+ // the persist collection. This is primarily relevant when we do _not_
1475+ // want to include the snapshot in the sink.
1476+ //
1477+ // We choose the smallest as_of that is legal, according to the sinked
1478+ // collection's since.
1479+ let id_bundle = crate :: CollectionIdBundle {
1480+ storage_ids : BTreeSet :: from ( [ sink. from ] ) ,
1481+ compute_ids : BTreeMap :: new ( ) ,
1482+ } ;
1483+
1484+ // We're putting in place read holds, such that create_collections, below,
1485+ // which calls update_read_capabilities, can successfully do so.
1486+ // Otherwise, the since of dependencies might move along concurrently,
1487+ // pulling the rug from under us!
1488+
1489+ // TODO: Maybe in the future, pass those holds on to storage, to hold on
1490+ // │ to them and downgrade when possible?
1491+ let read_holds = self . acquire_read_holds ( & id_bundle) ;
1492+ let as_of = read_holds. least_valid_read ( ) ;
1493+
1494+ let storage_sink_from_entry = self . catalog ( ) . get_entry_by_global_id ( & sink. from ) ;
1495+ let storage_sink_desc = mz_storage_types:: sinks:: StorageSinkDesc {
1496+ from : sink. from ,
1497+ from_desc : storage_sink_from_entry
1498+ . relation_desc ( )
1499+ . expect ( "sinks can only be built on items with descs" )
1500+ . into_owned ( ) ,
1501+ connection : sink
1502+ . connection
1503+ . clone ( )
1504+ . into_inline_connection ( self . catalog ( ) . state ( ) ) ,
1505+ envelope : sink. envelope ,
1506+ as_of,
1507+ with_snapshot : sink. with_snapshot ,
1508+ version : sink. version ,
1509+ from_storage_metadata : ( ) ,
1510+ to_storage_metadata : ( ) ,
1511+ commit_interval : sink. commit_interval ,
1512+ } ;
1513+
1514+ let collection_desc = CollectionDescription {
1515+ // TODO(sinks): make generic once we have more than one sink type.
1516+ desc : KAFKA_PROGRESS_DESC . clone ( ) ,
1517+ data_source : DataSource :: Sink {
1518+ desc : ExportDescription {
1519+ sink : storage_sink_desc,
1520+ instance_id : sink. cluster_id ,
1521+ } ,
1522+ } ,
1523+ since : None ,
1524+ status_collection_id : None ,
1525+ timeline : None ,
1526+ primary : None ,
1527+ } ;
1528+
1529+ // Create the collection.
1530+ let storage_metadata = self . catalog . state ( ) . storage_metadata ( ) ;
1531+ self . controller
1532+ . storage
1533+ . create_collections ( storage_metadata, None , vec ! [ ( global_id, collection_desc) ] )
1534+ . await
1535+ . unwrap_or_terminate ( "cannot fail to create exports" ) ;
1536+
1537+ // Drop read holds after the export has been created, at which point
1538+ // storage will have put in its own read holds.
1539+ drop ( read_holds) ;
1540+
1541+ // Initialize read policies for the sink
1542+ storage_policies_to_initialize
1543+ . entry ( CompactionWindow :: Default )
1544+ . or_default ( )
1545+ . insert ( global_id) ;
1546+
1547+ Ok ( ( ) )
1548+ }
14491549}
14501550
14511551/// A state machine for building catalog implications from catalog updates.
0 commit comments