Skip to content

Commit e501c3a

Browse files
authored
fix: error in multi statement transaction retry (#18934)
* fix: error in multi statement transaction retry * add test * fix * fix * fix * add defensive check * fix
1 parent b02cc59 commit e501c3a

File tree

7 files changed

+285
-14
lines changed

7 files changed

+285
-14
lines changed

src/query/storages/common/table_meta/src/meta/v4/snapshot.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashSet;
1516
use std::io::Cursor;
1617
use std::io::Read;
1718
use std::sync::Arc;
@@ -143,6 +144,8 @@ impl TableSnapshot {
143144
return Err(ErrorCode::TransactionTimeout(err_msg));
144145
}
145146

147+
ensure_segments_unique(&segments)?;
148+
146149
Ok(Self {
147150
format_version: TableSnapshot::VERSION,
148151
snapshot_id: uuid_from_date_time(snapshot_timestamp_adjusted),
@@ -244,8 +247,11 @@ impl TableSnapshot {
244247
let compression = MetaCompression::try_from(r.read_scalar::<u8>()?)?;
245248
let snapshot_size: u64 = r.read_scalar::<u64>()?;
246249

247-
read_and_deserialize(&mut r, snapshot_size, &encoding, &compression)
248-
.map_err(|x| x.add_message("fail to deserialize table snapshot"))
250+
let snapshot: TableSnapshot =
251+
read_and_deserialize(&mut r, snapshot_size, &encoding, &compression)
252+
.map_err(|x| x.add_message("fail to deserialize table snapshot"))?;
253+
snapshot.ensure_segments_unique()?;
254+
Ok(snapshot)
249255
}
250256

251257
#[inline]
@@ -257,11 +263,36 @@ impl TableSnapshot {
257263
pub fn table_statistics_location(&self) -> Option<String> {
258264
self.table_statistics_location.clone()
259265
}
266+
267+
#[inline]
268+
pub fn ensure_segments_unique(&self) -> Result<()> {
269+
ensure_segments_unique(&self.segments)
270+
}
271+
}
272+
273+
fn ensure_segments_unique(segments: &[Location]) -> Result<()> {
274+
if segments.len() < 2 {
275+
return Ok(());
276+
}
277+
278+
let mut seen = HashSet::with_capacity(segments.len());
279+
for loc in segments {
280+
let key = loc.0.as_str();
281+
if !seen.insert(key) {
282+
log::warn!(
283+
"duplicate segment location {} detected while constructing snapshot",
284+
key
285+
);
286+
}
287+
}
288+
Ok(())
260289
}
261290

262291
// use the chain of converters, for versions before v3
263292
impl From<v2::TableSnapshot> for TableSnapshot {
264293
fn from(s: v2::TableSnapshot) -> Self {
294+
ensure_segments_unique(&s.segments)
295+
.expect("duplicate segment location found while converting snapshot from v2");
265296
Self {
266297
// NOTE: it is important to let the format_version return from here
267298
// carries the format_version of snapshot being converted.
@@ -284,6 +315,8 @@ where T: Into<v3::TableSnapshot>
284315
{
285316
fn from(s: T) -> Self {
286317
let s: v3::TableSnapshot = s.into();
318+
ensure_segments_unique(&s.segments)
319+
.expect("duplicate segment location found while converting snapshot from v3");
287320
Self {
288321
// NOTE: it is important to let the format_version return from here
289322
// carries the format_version of snapshot being converted.

src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ async fn build_update_table_meta_req(
280280
table_meta_timestamps,
281281
table_stats_gen,
282282
)?;
283+
snapshot.ensure_segments_unique()?;
283284

284285
// write snapshot
285286
let dal = fuse_table.get_operator();

src/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,7 @@ where F: SnapshotGenerator + Send + Sync + 'static
527527
snapshot,
528528
table_info,
529529
} => {
530+
snapshot.ensure_segments_unique()?;
530531
let location = self
531532
.location_gen
532533
.snapshot_location_from_uuid(&snapshot.snapshot_id, TableSnapshot::VERSION)?;

src/query/storages/fuse/src/retry/commit.rs

Lines changed: 106 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::sync::Arc;
1617
use std::time::Instant;
1718

@@ -39,6 +40,8 @@ use crate::statistics::merge_statistics;
3940
use crate::statistics::reducers::deduct_statistics;
4041
use crate::FuseTable;
4142

43+
const FUSE_ENGINE: &str = "FUSE";
44+
4245
pub async fn commit_with_backoff(
4346
ctx: Arc<dyn TableContext>,
4447
mut req: UpdateMultiTableMetaReq,
@@ -47,6 +50,13 @@ pub async fn commit_with_backoff(
4750
let mut backoff = set_backoff(None, None, None);
4851
let mut retries = 0;
4952

53+
// Compute segments diff for all tables before entering the retry loop.
54+
// This diff represents the actual changes made by the transaction (base -> txn_generated),
55+
// and remains constant across all retries.
56+
// Also cache the original snapshots for statistics merging.
57+
let (table_segments_diffs, table_original_snapshots) =
58+
compute_table_segments_diffs(ctx.clone(), &req).await?;
59+
5060
loop {
5161
let ret = catalog
5262
.retryable_update_multi_table_meta(req.clone())
@@ -63,14 +73,88 @@ pub async fn commit_with_backoff(
6373
};
6474
sleep(duration).await;
6575
retries += 1;
66-
try_rebuild_req(ctx.clone(), &mut req, update_failed_tbls).await?;
76+
try_rebuild_req(
77+
ctx.clone(),
78+
&mut req,
79+
update_failed_tbls,
80+
&table_segments_diffs,
81+
&table_original_snapshots,
82+
)
83+
.await?;
6784
}
6885
}
6986

87+
async fn compute_table_segments_diffs(
88+
ctx: Arc<dyn TableContext>,
89+
req: &UpdateMultiTableMetaReq,
90+
) -> Result<(
91+
HashMap<u64, SegmentsDiff>,
92+
HashMap<u64, Option<Arc<TableSnapshot>>>,
93+
)> {
94+
let txn_mgr = ctx.txn_mgr();
95+
let storage_class = ctx.get_settings().get_s3_storage_class()?;
96+
let mut table_segments_diffs = HashMap::new();
97+
let mut table_original_snapshots = HashMap::new();
98+
99+
for (update_table_meta_req, _) in &req.update_table_metas {
100+
let tid = update_table_meta_req.table_id;
101+
let engine = update_table_meta_req.new_table_meta.engine.as_str();
102+
103+
if engine != FUSE_ENGINE {
104+
log::info!(
105+
"Skipping segments diff pre-compute for table {} with engine {}",
106+
tid,
107+
engine
108+
);
109+
continue;
110+
}
111+
112+
// Read the base snapshot (snapshot at transaction begin)
113+
let base_snapshot_location = txn_mgr.lock().get_base_snapshot_location(tid);
114+
115+
// Read the transaction-generated snapshot (original snapshot before any merge)
116+
let new_table = FuseTable::from_table_meta(
117+
update_table_meta_req.table_id,
118+
0,
119+
update_table_meta_req.new_table_meta.clone(),
120+
storage_class,
121+
)?;
122+
123+
let base_snapshot = new_table
124+
.read_table_snapshot_with_location(base_snapshot_location)
125+
.await?;
126+
let new_snapshot = new_table.read_table_snapshot().await?;
127+
128+
let base_segments = base_snapshot
129+
.as_ref()
130+
.map(|s| s.segments.as_slice())
131+
.unwrap_or(&[]);
132+
let new_segments = new_snapshot
133+
.as_ref()
134+
.map(|s| s.segments.as_slice())
135+
.unwrap_or(&[]);
136+
137+
info!(
138+
"Computing segments diff for table {} (base: {} segments, txn: {} segments)",
139+
tid,
140+
base_segments.len(),
141+
new_segments.len()
142+
);
143+
144+
let diff = SegmentsDiff::new(base_segments, new_segments);
145+
table_segments_diffs.insert(tid, diff);
146+
table_original_snapshots.insert(tid, new_snapshot);
147+
}
148+
149+
Ok((table_segments_diffs, table_original_snapshots))
150+
}
151+
70152
async fn try_rebuild_req(
71153
ctx: Arc<dyn TableContext>,
72154
req: &mut UpdateMultiTableMetaReq,
73155
update_failed_tbls: Vec<(u64, u64, TableMeta)>,
156+
table_segments_diffs: &HashMap<u64, SegmentsDiff>,
157+
table_original_snapshots: &HashMap<u64, Option<Arc<TableSnapshot>>>,
74158
) -> Result<()> {
75159
info!(
76160
"try_rebuild_req: update_failed_tbls={:?}",
@@ -98,26 +182,35 @@ async fn try_rebuild_req(
98182
.iter_mut()
99183
.find(|(meta, _)| meta.table_id == tid)
100184
.unwrap();
101-
let new_table = FuseTable::from_table_meta(
102-
update_table_meta_req.table_id,
103-
0,
104-
update_table_meta_req.new_table_meta.clone(),
105-
storage_class,
106-
)?;
107-
let new_snapshot = new_table.read_table_snapshot().await?;
185+
108186
let base_snapshot_location = txn_mgr.lock().get_base_snapshot_location(tid);
109-
let base_snapshot = new_table
110-
.read_table_snapshot_with_location(base_snapshot_location)
187+
let base_snapshot = latest_table
188+
.read_table_snapshot_with_location(base_snapshot_location.clone())
111189
.await?;
112190

113-
let segments_diff = SegmentsDiff::new(base_snapshot.segments(), new_snapshot.segments());
114-
let Some(merged_segments) = segments_diff.apply(latest_snapshot.segments().to_vec()) else {
191+
// Get the pre-computed segments diff for this table (computed before retry loop)
192+
let segments_diff = table_segments_diffs.get(&tid).ok_or_else(|| {
193+
ErrorCode::Internal(format!("Missing segments diff for table {}", tid))
194+
})?;
195+
196+
let Some(merged_segments) = segments_diff
197+
.clone()
198+
.apply(latest_snapshot.segments().to_vec())
199+
else {
115200
return Err(ErrorCode::UnresolvableConflict(format!(
116201
"Unresolvable conflict detected for table {}",
117202
tid
118203
)));
119204
};
120205

206+
// Read the original transaction-generated snapshot from cache for statistics merging
207+
let new_snapshot = table_original_snapshots
208+
.get(&tid)
209+
.ok_or_else(|| {
210+
ErrorCode::Internal(format!("Missing original snapshot for table {}", tid))
211+
})?
212+
.clone();
213+
121214
let s = merge_statistics(
122215
new_snapshot.summary(),
123216
&latest_snapshot.summary(),
@@ -214,6 +307,7 @@ async fn try_rebuild_req(
214307
latest_snapshot.table_statistics_location(),
215308
table_meta_timestamps,
216309
)?;
310+
merged_snapshot.ensure_segments_unique()?;
217311

218312
// write snapshot
219313
let dal = latest_table.get_operator();

src/query/storages/fuse/src/retry/diff.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::collections::HashSet;
1717

1818
use databend_storages_common_table_meta::meta::Location;
1919

20+
#[derive(Clone)]
2021
pub struct SegmentsDiff {
2122
appended: Vec<Location>,
2223
replaced: HashMap<Location, Vec<Location>>,

0 commit comments

Comments
 (0)