Skip to content

Commit b3bc994

Browse files
committed
table lvt
1 parent 793b082 commit b3bc994

File tree

20 files changed

+334
-34
lines changed

20 files changed

+334
-34
lines changed

src/common/exception/src/exception_code.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ build_exceptions! {
511511
IllegalUser(2218),
512512
}
513513

514-
// Database and Catalog Management Errors [2301-2317, 2321-2323]
514+
// Database and Catalog Management Errors [2301-2317, 2321-2327]
515515
build_exceptions! {
516516
/// Database already exists
517517
DatabaseAlreadyExists(2301),
@@ -551,6 +551,8 @@ build_exceptions! {
551551
RowAccessPolicyAlreadyExists(2324),
552552
/// General failures met while garbage collecting database meta
553553
GeneralDbGcFailure(2325),
554+
/// Table snapshot is expired
555+
TableSnapshotExpired(2327),
554556
}
555557

556558
// Stage and Connection Errors [2501-2505, 2510-2512]

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ use databend_common_meta_app::schema::SequenceIdent;
110110
use databend_common_meta_app::schema::SetSecurityPolicyAction;
111111
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
112112
use databend_common_meta_app::schema::SetTableRowAccessPolicyReq;
113+
use databend_common_meta_app::schema::SnapshotLvtCheck;
113114
use databend_common_meta_app::schema::SwapTableReq;
114115
use databend_common_meta_app::schema::TableCopiedFileInfo;
115116
use databend_common_meta_app::schema::TableCopiedFileNameIdent;
@@ -2642,6 +2643,7 @@ impl SchemaApiTestSuite {
26422643
mt: &MT,
26432644
) -> anyhow::Result<()> {
26442645
let tenant_name = "tenant1";
2646+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
26452647
let db_name = "db1";
26462648
let tbl_name = "tb2";
26472649

@@ -2717,6 +2719,7 @@ impl SchemaApiTestSuite {
27172719
seq: MatchSeq::Exact(table_version),
27182720
new_table_meta: new_table_meta.clone(),
27192721
base_snapshot_location: None,
2722+
lvt_check: None,
27202723
};
27212724

27222725
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2742,6 +2745,7 @@ impl SchemaApiTestSuite {
27422745
seq: MatchSeq::Exact(table_version + 1),
27432746
new_table_meta: new_table_meta.clone(),
27442747
base_snapshot_location: None,
2748+
lvt_check: None,
27452749
};
27462750
let res = mt
27472751
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2768,6 +2772,7 @@ impl SchemaApiTestSuite {
27682772
seq: MatchSeq::Exact(table_version),
27692773
new_table_meta: new_table_meta.clone(),
27702774
base_snapshot_location: None,
2775+
lvt_check: None,
27712776
};
27722777
let res = mt
27732778
.update_multi_table_meta_with_sender(
@@ -2849,6 +2854,7 @@ impl SchemaApiTestSuite {
28492854
seq: MatchSeq::Exact(table_version),
28502855
new_table_meta: new_table_meta.clone(),
28512856
base_snapshot_location: None,
2857+
lvt_check: None,
28522858
};
28532859
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
28542860
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2899,6 +2905,7 @@ impl SchemaApiTestSuite {
28992905
seq: MatchSeq::Exact(table_version),
29002906
new_table_meta: new_table_meta.clone(),
29012907
base_snapshot_location: None,
2908+
lvt_check: None,
29022909
};
29032910
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
29042911
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2949,6 +2956,7 @@ impl SchemaApiTestSuite {
29492956
seq: MatchSeq::Exact(table_version),
29502957
new_table_meta: new_table_meta.clone(),
29512958
base_snapshot_location: None,
2959+
lvt_check: None,
29522960
};
29532961
let result = mt
29542962
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2961,6 +2969,67 @@ impl SchemaApiTestSuite {
29612969
let err = ErrorCode::from(err);
29622970
assert_eq!(ErrorCode::DUPLICATED_UPSERT_FILES, err.code());
29632971
}
2972+
2973+
info!("--- update table meta, snapshot_ts must respect LVT");
2974+
{
2975+
let table = util.get_table().await.unwrap();
2976+
let table_id = table.ident.table_id;
2977+
let lvt_ident = LeastVisibleTimeIdent::new(&tenant, table_id);
2978+
let lvt_time = DateTime::<Utc>::from_timestamp(2_000, 0).unwrap();
2979+
mt.set_table_lvt(&lvt_ident, &LeastVisibleTime::new(lvt_time))
2980+
.await?;
2981+
2982+
// Snapshot older than LVT should be rejected.
2983+
let mut new_table_meta = table.meta.clone();
2984+
new_table_meta.comment = "lvt guard should fail".to_string();
2985+
let bad_snapshot_ts = DateTime::<Utc>::from_timestamp(1_000, 0).unwrap();
2986+
let req = UpdateTableMetaReq {
2987+
table_id,
2988+
seq: MatchSeq::Exact(table.ident.seq),
2989+
new_table_meta: new_table_meta.clone(),
2990+
base_snapshot_location: None,
2991+
lvt_check: Some(SnapshotLvtCheck {
2992+
tenant: tenant.clone(),
2993+
timestamp: bad_snapshot_ts,
2994+
}),
2995+
};
2996+
let err = mt
2997+
.update_multi_table_meta(UpdateMultiTableMetaReq {
2998+
update_table_metas: vec![(req, table.as_ref().clone())],
2999+
..Default::default()
3000+
})
3001+
.await
3002+
.unwrap_err();
3003+
assert_eq!(
3004+
ErrorCode::TABLE_SNAPSHOT_EXPIRED,
3005+
ErrorCode::from(err).code()
3006+
);
3007+
3008+
// Snapshot newer than LVT should succeed.
3009+
let table = util.get_table().await.unwrap();
3010+
let mut ok_table_meta = table.meta.clone();
3011+
ok_table_meta.comment = "lvt guard success".to_string();
3012+
let ok_snapshot_ts = DateTime::<Utc>::from_timestamp(2_001, 0).unwrap();
3013+
let req = UpdateTableMetaReq {
3014+
table_id,
3015+
seq: MatchSeq::Exact(table.ident.seq),
3016+
new_table_meta: ok_table_meta.clone(),
3017+
base_snapshot_location: None,
3018+
lvt_check: Some(SnapshotLvtCheck {
3019+
tenant,
3020+
timestamp: ok_snapshot_ts,
3021+
}),
3022+
};
3023+
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
3024+
update_table_metas: vec![(req, table.as_ref().clone())],
3025+
..Default::default()
3026+
})
3027+
.await?
3028+
.unwrap();
3029+
3030+
let updated = util.get_table().await.unwrap();
3031+
assert_eq!(updated.meta.comment, "lvt guard success");
3032+
}
29643033
}
29653034
Ok(())
29663035
}
@@ -4300,6 +4369,7 @@ impl SchemaApiTestSuite {
43004369
seq: MatchSeq::Any,
43014370
new_table_meta: table_meta.clone(),
43024371
base_snapshot_location: None,
4372+
lvt_check: None,
43034373
};
43044374

43054375
let table = mt
@@ -4440,6 +4510,7 @@ impl SchemaApiTestSuite {
44404510
seq: MatchSeq::Any,
44414511
new_table_meta: create_table_meta.clone(),
44424512
base_snapshot_location: None,
4513+
lvt_check: None,
44434514
};
44444515

44454516
let table = mt
@@ -6237,6 +6308,7 @@ impl SchemaApiTestSuite {
62376308
seq: MatchSeq::Any,
62386309
new_table_meta: table_meta(created_on),
62396310
base_snapshot_location: None,
6311+
lvt_check: None,
62406312
};
62416313

62426314
let table = mt
@@ -6288,6 +6360,7 @@ impl SchemaApiTestSuite {
62886360
seq: MatchSeq::Any,
62896361
new_table_meta: table_meta(created_on),
62906362
base_snapshot_location: None,
6363+
lvt_check: None,
62916364
};
62926365

62936366
let table = mt
@@ -7801,6 +7874,7 @@ impl SchemaApiTestSuite {
78017874
seq: MatchSeq::Any,
78027875
new_table_meta: table_meta(created_on),
78037876
base_snapshot_location: None,
7877+
lvt_check: None,
78047878
};
78057879

78067880
let table = mt
@@ -7860,6 +7934,7 @@ impl SchemaApiTestSuite {
78607934
seq: MatchSeq::Any,
78617935
new_table_meta: table_meta(created_on),
78627936
base_snapshot_location: None,
7937+
lvt_check: None,
78637938
};
78647939

78657940
let table = mt
@@ -7916,6 +7991,7 @@ impl SchemaApiTestSuite {
79167991
seq: MatchSeq::Any,
79177992
new_table_meta: table_meta(created_on),
79187993
base_snapshot_location: None,
7994+
lvt_check: None,
79197995
};
79207996

79217997
let table = mt
@@ -8370,6 +8446,7 @@ where MT: SchemaApi + kvapi::KVApi<Error = MetaError>
83708446
seq: MatchSeq::Any,
83718447
new_table_meta: self.table_meta(),
83728448
base_snapshot_location: None,
8449+
lvt_check: None,
83738450
};
83748451

83758452
let req = UpdateMultiTableMetaReq {

src/meta/api/src/table_api.rs

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use databend_common_meta_app::app_error::MultiStmtTxnCommitFailed;
3131
use databend_common_meta_app::app_error::StreamAlreadyExists;
3232
use databend_common_meta_app::app_error::StreamVersionMismatched;
3333
use databend_common_meta_app::app_error::TableAlreadyExists;
34+
use databend_common_meta_app::app_error::TableSnapshotExpired;
3435
use databend_common_meta_app::app_error::TableVersionMismatched;
3536
use databend_common_meta_app::app_error::UndropTableHasNoHistory;
3637
use databend_common_meta_app::app_error::UndropTableWithNoDropTime;
@@ -97,11 +98,13 @@ use databend_common_meta_kvapi::kvapi::KvApiExt;
9798
use databend_common_meta_types::protobuf as pb;
9899
use databend_common_meta_types::txn_op::Request;
99100
use databend_common_meta_types::txn_op_response::Response;
101+
use databend_common_meta_types::ConditionResult;
100102
use databend_common_meta_types::ConditionResult::Eq;
101103
use databend_common_meta_types::MatchSeqExt;
102104
use databend_common_meta_types::MetaError;
103105
use databend_common_meta_types::MetaId;
104106
use databend_common_meta_types::SeqV;
107+
use databend_common_meta_types::TxnCondition;
105108
use databend_common_meta_types::TxnGetRequest;
106109
use databend_common_meta_types::TxnGetResponse;
107110
use databend_common_meta_types::TxnOp;
@@ -1198,19 +1201,19 @@ where
11981201
})
11991202
.collect::<Vec<_>>();
12001203
let mut tb_meta_vec: Vec<(u64, Option<TableMeta>)> = mget_pb_values(self, &tid_vec).await?;
1201-
for (req, (tb_meta_seq, table_meta)) in
1204+
for ((req, _), (tb_meta_seq, table_meta)) in
12021205
update_table_metas.iter().zip(tb_meta_vec.iter_mut())
12031206
{
1204-
let req_seq = req.0.seq;
1207+
let req_seq = req.seq;
12051208

12061209
if *tb_meta_seq == 0 || table_meta.is_none() {
12071210
return Err(KVAppError::AppError(AppError::UnknownTableId(
1208-
UnknownTableId::new(req.0.table_id, "update_multi_table_meta"),
1211+
UnknownTableId::new(req.table_id, "update_multi_table_meta"),
12091212
)));
12101213
}
12111214
if req_seq.match_seq(tb_meta_seq).is_err() {
12121215
mismatched_tbs.push((
1213-
req.0.table_id,
1216+
req.table_id,
12141217
*tb_meta_seq,
12151218
std::mem::take(table_meta).unwrap(),
12161219
));
@@ -1222,27 +1225,39 @@ where
12221225
}
12231226

12241227
let mut new_table_meta_map: BTreeMap<u64, TableMeta> = BTreeMap::new();
1225-
for (req, (tb_meta_seq, table_meta)) in
1228+
for ((req, _), (tb_meta_seq, table_meta)) in
12261229
update_table_metas.iter_mut().zip(tb_meta_vec.iter())
12271230
{
12281231
let tbid = TableId {
1229-
table_id: req.0.table_id,
1232+
table_id: req.table_id,
12301233
};
12311234
// `update_table_meta` MUST NOT modify `shared_by` field
12321235
let table_meta = table_meta.as_ref().unwrap();
12331236

1234-
let mut new_table_meta = req.0.new_table_meta.clone();
1237+
let mut new_table_meta = req.new_table_meta.clone();
12351238
new_table_meta.shared_by = table_meta.shared_by.clone();
12361239

1237-
tbl_seqs.insert(req.0.table_id, *tb_meta_seq);
1240+
tbl_seqs.insert(req.table_id, *tb_meta_seq);
12381241
txn.condition.push(txn_cond_seq(&tbid, Eq, *tb_meta_seq));
1242+
1243+
// Add LVT check if provided
1244+
if let Some(check) = req.lvt_check.as_ref() {
1245+
let lvt_ident = LeastVisibleTimeIdent::new(&check.tenant, req.table_id);
1246+
let check_lvt_value = LeastVisibleTime::new(check.timestamp);
1247+
txn.condition.push(TxnCondition::match_value(
1248+
lvt_ident.to_string_key(),
1249+
ConditionResult::Le,
1250+
serialize_struct(&check_lvt_value)?,
1251+
));
1252+
}
1253+
12391254
txn.if_then
12401255
.push(txn_op_put(&tbid, serialize_struct(&new_table_meta)?));
12411256
txn.else_then.push(TxnOp {
12421257
request: Some(Request::Get(TxnGetRequest::new(tbid.to_string_key()))),
12431258
});
12441259

1245-
new_table_meta_map.insert(req.0.table_id, new_table_meta);
1260+
new_table_meta_map.insert(req.table_id, new_table_meta);
12461261
}
12471262

12481263
// `remove_table_copied_files` and `upsert_table_copied_file_info`
@@ -1382,6 +1397,27 @@ where
13821397
),
13831398
)))
13841399
} else {
1400+
// Check if the transaction failed due to LVT check
1401+
if update_table_metas.len() == 1 {
1402+
let req = &update_table_metas[0].0;
1403+
if let Some(check) = req.lvt_check.as_ref() {
1404+
let lvt_ident = LeastVisibleTimeIdent::new(&check.tenant, req.table_id);
1405+
let current_lvt: Option<SeqV<LeastVisibleTime>> =
1406+
self.get_pb(&lvt_ident).await?;
1407+
if let Some(lvt_data) = current_lvt {
1408+
let lvt_time = lvt_data.data.time;
1409+
let snapshot_ts = check.timestamp;
1410+
1411+
// If LVT > snapshot_ts, the snapshot has expired
1412+
if lvt_time > snapshot_ts {
1413+
return Err(KVAppError::AppError(AppError::TableSnapshotExpired(
1414+
TableSnapshotExpired::new(req.table_id, snapshot_ts, lvt_time),
1415+
)));
1416+
}
1417+
}
1418+
}
1419+
}
1420+
13851421
// if all table version does match, but tx failed, we don't know why, just return error
13861422
Err(KVAppError::AppError(AppError::from(
13871423
MultiStmtTxnCommitFailed::new("update_multi_table_meta"),

src/meta/app/src/app_error.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,24 @@ impl TableLockExpired {
863863
}
864864
}
865865

866+
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
867+
#[error("Snapshot timestamp {snapshot_ts} for table {table_id} is older than the table's least visible time {lvt}")]
868+
pub struct TableSnapshotExpired {
869+
table_id: u64,
870+
snapshot_ts: DateTime<Utc>,
871+
lvt: DateTime<Utc>,
872+
}
873+
874+
impl TableSnapshotExpired {
875+
pub fn new(table_id: u64, snapshot_ts: DateTime<Utc>, lvt: DateTime<Utc>) -> Self {
876+
Self {
877+
table_id,
878+
snapshot_ts,
879+
lvt,
880+
}
881+
}
882+
}
883+
866884
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
867885
#[error(
868886
"CannotShareDatabaseCreatedFromShare: cannot share database {database_name} which created from share while {context}"
@@ -1125,6 +1143,9 @@ pub enum AppError {
11251143
#[error(transparent)]
11261144
TableLockExpired(#[from] TableLockExpired),
11271145

1146+
#[error(transparent)]
1147+
TableSnapshotExpired(#[from] TableSnapshotExpired),
1148+
11281149
#[error(transparent)]
11291150
CannotShareDatabaseCreatedFromShare(#[from] CannotShareDatabaseCreatedFromShare),
11301151

@@ -1472,6 +1493,15 @@ impl AppErrorMessage for TableLockExpired {
14721493
}
14731494
}
14741495

1496+
impl AppErrorMessage for TableSnapshotExpired {
1497+
fn message(&self) -> String {
1498+
format!(
1499+
"Snapshot timestamp {} for table {} is older than the table's least visible time {}",
1500+
self.snapshot_ts, self.table_id, self.lvt,
1501+
)
1502+
}
1503+
}
1504+
14751505
impl AppErrorMessage for CannotShareDatabaseCreatedFromShare {
14761506
fn message(&self) -> String {
14771507
format!(
@@ -1688,6 +1718,7 @@ impl From<AppError> for ErrorCode {
16881718
ErrorCode::UnknownShareEndpointId(err.message())
16891719
}
16901720
AppError::TableLockExpired(err) => ErrorCode::TableLockExpired(err.message()),
1721+
AppError::TableSnapshotExpired(err) => ErrorCode::TableSnapshotExpired(err.message()),
16911722
AppError::CannotShareDatabaseCreatedFromShare(err) => {
16921723
ErrorCode::CannotShareDatabaseCreatedFromShare(err.message())
16931724
}

src/meta/app/src/schema/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ pub use table::SetTableColumnMaskPolicyReply;
125125
pub use table::SetTableColumnMaskPolicyReq;
126126
pub use table::SetTableRowAccessPolicyReply;
127127
pub use table::SetTableRowAccessPolicyReq;
128+
pub use table::SnapshotLvtCheck;
128129
pub use table::SnapshotRef;
129130
pub use table::SnapshotRefType;
130131
pub use table::SwapTableReply;

0 commit comments

Comments
 (0)