Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 345 additions & 4 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ use uuid::Uuid;
use crate::spec::{
EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
UnboundPartitionSpec, ViewFormatVersion, ViewMetadata, ViewMetadataBuilder,
ViewRepresentations, ViewVersion,
};
use crate::table::Table;
use crate::view::View;
use crate::{Error, ErrorKind, Result};

/// The catalog API for Iceberg Rust.
Expand Down Expand Up @@ -966,6 +968,125 @@ pub enum ViewUpdate {
},
}

impl ViewUpdate {
/// Applies the update to the view metadata builder.
pub fn apply(self, builder: ViewMetadataBuilder) -> Result<ViewMetadataBuilder> {
match self {
ViewUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
ViewUpdate::UpgradeFormatVersion { format_version } => {
builder.upgrade_format_version(format_version)
}
ViewUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)),
ViewUpdate::SetLocation { location } => Ok(builder.set_location(location)),
ViewUpdate::SetProperties { updates } => builder.set_properties(updates),
ViewUpdate::RemoveProperties { removals } => Ok(builder.remove_properties(&removals)),
ViewUpdate::AddViewVersion { view_version } => builder.add_version(view_version),
ViewUpdate::SetCurrentViewVersion { view_version_id } => {
builder.set_current_version_id(view_version_id)
}
}
}
}

/// The builder is marked as private since it's dangerous and error-prone to construct
/// [`ViewCommit`] directly.
#[derive(Debug, TypedBuilder)]
#[builder(build_method(vis = "pub(crate)"))]
pub struct ViewCommit {
/// The view ident.
ident: TableIdent,
/// The requirements of the view.
///
/// Commit will fail if the requirements are not met.
requirements: Vec<ViewRequirement>,
/// The updates of the view.
updates: Vec<ViewUpdate>,
}

impl ViewCommit {
/// Return the view identifier.
pub fn identifier(&self) -> &TableIdent {
&self.ident
}

/// Take all requirements.
pub fn take_requirements(&mut self) -> Vec<ViewRequirement> {
take(&mut self.requirements)
}

/// Take all updates.
pub fn take_updates(&mut self) -> Vec<ViewUpdate> {
take(&mut self.updates)
}

/// Applies this [`ViewCommit`] to the given [`View`] as part of a catalog update.
/// Typically used by catalog implementations to validate requirements and apply
/// metadata updates.
///
/// Returns a new [`View`] with updated metadata,
/// or an error if validation or application fails.
pub fn apply(self, view: View) -> Result<View> {
for requirement in &self.requirements {
requirement.check(Some(view.metadata()))?;
}

let mut metadata_builder = view.metadata().clone().into_builder();
for update in self.updates {
metadata_builder = update.apply(metadata_builder)?;
}

let new_metadata = metadata_builder.build()?;

Ok(view.with_metadata(Arc::new(new_metadata.metadata)))
}
}

/// Requirements are used as preconditions for view commits to ensure
/// optimistic concurrency control.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum ViewRequirement {
/// The view UUID must match the requirement.
#[serde(rename = "assert-view-uuid")]
AssertViewUuid {
/// Expected UUID of the view.
uuid: Uuid,
},
}

impl ViewRequirement {
/// Check that the requirement is met by the view metadata.
/// If the requirement is not met, an appropriate error is returned.
/// Provide metadata as `None` if the view does not exist.
pub fn check(&self, metadata: Option<&ViewMetadata>) -> Result<()> {
if let Some(metadata) = metadata {
match self {
ViewRequirement::AssertViewUuid { uuid } => {
if metadata.uuid() != *uuid {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: View UUID does not match",
)
.with_context("expected", *uuid)
.with_context("found", metadata.uuid()));
}
}
}
} else {
match self {
ViewRequirement::AssertViewUuid { .. } => {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: View does not exist",
));
}
}
}

Ok(())
}
}

mod _serde_set_statistics {
// The rest spec requires an additional field `snapshot-id`
// that is redundant with the `snapshot_id` field in the statistics file.
Expand Down Expand Up @@ -1025,17 +1146,18 @@ mod tests {
use serde::de::DeserializeOwned;
use uuid::uuid;

use super::ViewUpdate;
use super::{ViewCommit, ViewRequirement, ViewUpdate};
use crate::io::FileIOBuilder;
use crate::spec::{
BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
ViewVersion,
UnboundPartitionSpec, ViewFormatVersion, ViewMetadata, ViewMetadataBuilder,
ViewRepresentation, ViewRepresentations, ViewVersion, ViewVersionLog,
};
use crate::table::Table;
use crate::view::View;
use crate::{
NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
};
Expand Down Expand Up @@ -2405,4 +2527,223 @@ mod tests {
"s3://bucket/test/new_location/data",
);
}

fn test_view_metadata() -> ViewMetadata {
let schema = Schema::builder()
.with_schema_id(1)
.with_fields(vec![std::sync::Arc::new(
NestedField::optional(1, "event_count", Type::Primitive(PrimitiveType::Int))
.with_doc("Count of events"),
)])
.build()
.unwrap();

let version = ViewVersion::builder()
.with_version_id(1)
.with_timestamp_ms(1573518431292)
.with_schema_id(1)
.with_default_catalog("prod".to_string().into())
.with_default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
.with_summary(HashMap::from_iter(vec![(
"engine-name".to_string(),
"Spark".to_string(),
)]))
.with_representations(ViewRepresentations(vec![
SqlViewRepresentation {
sql: "SELECT COUNT(1) FROM events".to_string(),
dialect: "spark".to_string(),
}
.into(),
]))
.build();

ViewMetadata {
format_version: ViewFormatVersion::V1,
view_uuid: uuid!("fa6506c3-7681-40c8-86dc-e36561f83385"),
location: "s3://bucket/warehouse/default.db/event_agg".to_string(),
current_version_id: 1,
versions: HashMap::from_iter(vec![(1, std::sync::Arc::new(version))]),
version_log: vec![ViewVersionLog::new(1, 1573518431292)],
schemas: HashMap::from_iter(vec![(1, std::sync::Arc::new(schema))]),
properties: HashMap::from_iter(vec![(
"comment".to_string(),
"Daily event counts".to_string(),
)]),
}
}

fn test_view() -> View {
let metadata = test_view_metadata();
View::builder()
.metadata(metadata)
.identifier(TableIdent::from_strs(["ns", "my_view"]).unwrap())
.metadata_location("s3://bucket/metadata/v1.metadata.json")
.build()
.unwrap()
}

#[test]
fn test_view_requirement_uuid_match() {
let metadata = test_view_metadata();

let requirement = ViewRequirement::AssertViewUuid {
uuid: uuid!("fa6506c3-7681-40c8-86dc-e36561f83385"),
};
assert!(requirement.check(Some(&metadata)).is_ok());

let requirement = ViewRequirement::AssertViewUuid {
uuid: uuid::Uuid::now_v7(),
};
assert!(requirement.check(Some(&metadata)).is_err());
}

#[test]
fn test_view_requirement_view_not_exists() {
let requirement = ViewRequirement::AssertViewUuid {
uuid: uuid!("fa6506c3-7681-40c8-86dc-e36561f83385"),
};
assert!(requirement.check(None).is_err());
}

#[test]
fn test_view_requirement_serde() {
test_serde_json(
r#"
{
"type": "assert-view-uuid",
"uuid": "fa6506c3-7681-40c8-86dc-e36561f83385"
}
"#,
ViewRequirement::AssertViewUuid {
uuid: uuid!("fa6506c3-7681-40c8-86dc-e36561f83385"),
},
);
}

#[test]
fn test_view_update_apply_set_location() {
let metadata = test_view_metadata();
let builder = ViewMetadataBuilder::new_from_metadata(metadata);

let updated = ViewUpdate::SetLocation {
location: "s3://new/location".to_string(),
}
.apply(builder)
.unwrap()
.build()
.unwrap()
.metadata;

assert_eq!(updated.location(), "s3://new/location");
}

#[test]
fn test_view_update_apply_set_properties() {
let metadata = test_view_metadata();
let builder = ViewMetadataBuilder::new_from_metadata(metadata);

let updated = ViewUpdate::SetProperties {
updates: HashMap::from_iter(vec![("key1".to_string(), "value1".to_string())]),
}
.apply(builder)
.unwrap()
.build()
.unwrap()
.metadata;

assert_eq!(
updated.properties().get("key1"),
Some(&"value1".to_string())
);
// Original property should still exist
assert_eq!(
updated.properties().get("comment"),
Some(&"Daily event counts".to_string())
);
}

#[test]
fn test_view_update_apply_remove_properties() {
let metadata = test_view_metadata();
let builder = ViewMetadataBuilder::new_from_metadata(metadata);

let updated = ViewUpdate::RemoveProperties {
removals: vec!["comment".to_string()],
}
.apply(builder)
.unwrap()
.build()
.unwrap()
.metadata;

assert!(updated.properties().get("comment").is_none());
}

#[test]
fn test_view_update_apply_assign_uuid() {
let metadata = test_view_metadata();
let builder = ViewMetadataBuilder::new_from_metadata(metadata);
let new_uuid = uuid::Uuid::now_v7();

let updated = ViewUpdate::AssignUuid { uuid: new_uuid }
.apply(builder)
.unwrap()
.build()
.unwrap()
.metadata;

assert_eq!(updated.uuid(), new_uuid);
}

#[test]
fn test_view_commit_apply() {
let view = test_view();
let original_uuid = view.metadata().uuid();

let commit = ViewCommit::builder()
.ident(view.identifier().clone())
.requirements(vec![ViewRequirement::AssertViewUuid {
uuid: original_uuid,
}])
.updates(vec![
ViewUpdate::SetLocation {
location: "s3://bucket/new_location".to_string(),
},
ViewUpdate::SetProperties {
updates: HashMap::from_iter(vec![("updated".to_string(), "true".to_string())]),
},
])
.build();

let updated_view = commit.apply(view).unwrap();
assert_eq!(updated_view.location(), "s3://bucket/new_location");
assert_eq!(
updated_view.properties().get("updated"),
Some(&"true".to_string())
);
}

#[test]
fn test_view_commit_apply_fails_on_uuid_mismatch() {
let view = test_view();

let commit = ViewCommit::builder()
.ident(view.identifier().clone())
.requirements(vec![ViewRequirement::AssertViewUuid {
uuid: uuid::Uuid::now_v7(),
}])
.updates(vec![ViewUpdate::SetLocation {
location: "s3://bucket/new_location".to_string(),
}])
.build();

let result = commit.apply(view);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("View UUID does not match")
);
}
}
1 change: 1 addition & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ mod catalog;
pub use catalog::*;

pub mod table;
pub mod view;

mod avro;
pub mod cache;
Expand Down
Loading
Loading