Skip to content

Commit d7bf3cf

Browse files
authored
feat: basic support of schema evolution in copy for parquet (#19094)
* chore: add comments. * chore: TableInfo add fn get_option * add table option ENABLE_SCHEMA_EVOLUTION * feat: basic support of schema evolution in copy for parquet * feat: commit txn when table is altered.
1 parent 435aec1 commit d7bf3cf

File tree

17 files changed

+292
-55
lines changed

17 files changed

+292
-55
lines changed

src/common/storage/src/stage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ pub fn init_stage_operator(stage_info: &StageInfo) -> Result<Operator> {
9898
}
9999
/// select * from @s1/<path> (FILES => <files> PATTERN => <pattern>)
100100
/// copy from @s1/<path> FILES = <files> PATTERN => <pattern>
101-
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug)]
101+
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug, Default)]
102102
pub struct StageFilesInfo {
103103
pub path: String,
104104
pub files: Option<Vec<String>>,

src/meta/app/src/principal/user_stage.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,7 @@ pub struct StageParams {
402402
pub storage: StorageParams,
403403
}
404404

405+
// todo(yangxiufeng): not used, remove it later.
405406
#[derive(serde::Serialize, serde::Deserialize, Clone, Default, Debug, Eq, PartialEq)]
406407
#[serde(default)]
407408
pub struct CopyOptions {
@@ -428,6 +429,7 @@ pub struct StageInfo {
428429
// on `COPY INTO xx FROM 's3://xxx?ak=?&sk=?'`, the URL(ExternalLocation) will be treated as an temporary stage.
429430
pub is_temporary: bool,
430431
pub file_format_params: FileFormatParams,
432+
// todo(yangxiufeng): not used, remove it later.
431433
pub copy_options: CopyOptions,
432434
pub comment: String,
433435
/// TODO(xuanwo): stage doesn't have this info anymore, remove it.

src/meta/app/src/schema/table/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::fmt::Display;
2020
use std::fmt::Formatter;
2121
use std::ops::Deref;
2222
use std::ops::Range;
23+
use std::str::FromStr;
2324
use std::sync::Arc;
2425
use std::time::Duration;
2526

@@ -316,6 +317,13 @@ impl TableInfo {
316317
.clone()
317318
.map(|k| (self.meta.cluster_key_seq, k))
318319
}
320+
321+
pub fn get_option<T: FromStr>(&self, opt_key: &str, default: T) -> T {
322+
self.options()
323+
.get(opt_key)
324+
.and_then(|s| s.parse::<T>().ok())
325+
.unwrap_or(default)
326+
}
319327
}
320328

321329
impl Default for TablePartition {

src/query/catalog/src/plan/datasource/datasource_info/stage.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use databend_common_storage::init_stage_operator;
2727
use databend_common_storage::StageFileInfo;
2828
use databend_common_storage::StageFilesInfo;
2929

30-
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
30+
use crate::plan::FullParquetMeta;
31+
32+
#[derive(serde::Serialize, serde::Deserialize, Clone, Default)]
3133
pub struct StageTableInfo {
3234
// common
3335
pub stage_root: String,
@@ -47,8 +49,21 @@ pub struct StageTableInfo {
4749
pub is_select: bool,
4850
pub copy_into_table_options: CopyIntoTableOptions,
4951
pub is_variant: bool,
52+
53+
// temp work round, when enable_schema_evolution, set it before read partition,
54+
// then the StageTableInfo will be dropped, so no need to free it
55+
#[serde(skip)]
56+
pub parquet_metas: Option<Vec<Arc<FullParquetMeta>>>,
57+
}
58+
59+
impl PartialEq for StageTableInfo {
60+
fn eq(&self, other: &Self) -> bool {
61+
self.stage_root == other.stage_root && self.stage_info == other.stage_info
62+
}
5063
}
5164

65+
impl Eq for StageTableInfo {}
66+
5267
impl StageTableInfo {
5368
pub fn schema(&self) -> Arc<TableSchema> {
5469
self.schema.clone()

src/query/service/src/interpreters/common/table_option_validation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_COMMENT;
4747
use databend_storages_common_table_meta::table::OPT_KEY_CONNECTION_NAME;
4848
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
4949
use databend_storages_common_table_meta::table::OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH;
50+
use databend_storages_common_table_meta::table::OPT_KEY_ENABLE_SCHEMA_EVOLUTION;
5051
use databend_storages_common_table_meta::table::OPT_KEY_ENGINE;
5152
use databend_storages_common_table_meta::table::OPT_KEY_LOCATION;
5253
use databend_storages_common_table_meta::table::OPT_KEY_RANDOM_MAX_ARRAY_LEN;
@@ -90,6 +91,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new(
9091
r.insert(OPT_KEY_TEMP_PREFIX);
9192
r.insert(OPT_KEY_SEGMENT_FORMAT);
9293
r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH);
94+
r.insert(OPT_KEY_ENABLE_SCHEMA_EVOLUTION);
9395
r.insert(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY);
9496
r
9597
});

src/query/service/src/interpreters/interpreter_copy_into_table.rs

Lines changed: 173 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,46 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeMap;
16+
use std::collections::HashMap;
1617
use std::sync::Arc;
1718

19+
use arrow_schema::Field;
20+
use databend_common_ast::ast::ColumnMatchMode;
21+
use databend_common_ast::Span;
1822
use databend_common_catalog::lock::LockTableOption;
23+
use databend_common_catalog::plan::StageTableInfo;
1924
use databend_common_catalog::table::TableExt;
25+
use databend_common_exception::ErrorCode;
2026
use databend_common_exception::Result;
27+
use databend_common_expression::types::DataType;
2128
use databend_common_expression::types::Int32Type;
2229
use databend_common_expression::types::StringType;
2330
use databend_common_expression::DataBlock;
31+
use databend_common_expression::DataSchema;
2432
use databend_common_expression::FromData;
33+
use databend_common_expression::RemoteDefaultExpr;
34+
use databend_common_expression::RemoteExpr;
35+
use databend_common_expression::Scalar;
2536
use databend_common_expression::SendableDataBlockStream;
37+
use databend_common_expression::TableDataType;
38+
use databend_common_expression::TableField;
39+
use databend_common_expression::TableSchemaRef;
40+
use databend_common_meta_app::principal::FileFormatParams;
2641
use databend_common_meta_app::schema::TableInfo;
2742
use databend_common_meta_app::schema::UpdateStreamMetaReq;
2843
use databend_common_pipeline::core::Pipeline;
2944
use databend_common_sql::executor::physical_plans::FragmentKind;
3045
use databend_common_sql::executor::physical_plans::MutationKind;
3146
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
47+
use databend_common_storage::init_stage_operator;
48+
use databend_common_storage::parquet::infer_schema_with_extension;
3249
use databend_common_storage::StageFileInfo;
3350
use databend_common_storages_fuse::FuseTable;
51+
use databend_common_storages_parquet::read_metas_in_parallel_for_copy;
3452
use databend_common_storages_stage::StageTable;
3553
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
54+
use databend_storages_common_table_meta::readers::snapshot_reader::TableSnapshotAccessor;
55+
use itertools::Itertools;
3656
use log::debug;
3757
use log::info;
3858

@@ -100,10 +120,15 @@ impl CopyIntoTableInterpreter {
100120
#[async_backtrace::framed]
101121
pub async fn build_physical_plan(
102122
&self,
103-
table_info: TableInfo,
123+
mut table_info: TableInfo,
104124
plan: &CopyIntoTablePlan,
105125
table_meta_timestamps: TableMetaTimestamps,
106-
) -> Result<(PhysicalPlan, Vec<UpdateStreamMetaReq>)> {
126+
) -> Result<(
127+
PhysicalPlan,
128+
Vec<UpdateStreamMetaReq>,
129+
Option<TableSchemaRef>,
130+
)> {
131+
let mut new_schema = None;
107132
let mut update_stream_meta_reqs = vec![];
108133
let (source, project_columns) = if let Some(ref query) = plan.query {
109134
let query = if plan.enable_distributed {
@@ -122,7 +147,14 @@ impl CopyIntoTableInterpreter {
122147
Some(result_columns),
123148
)
124149
} else {
125-
let stage_table = StageTable::try_create(plan.stage_table_info.clone())?;
150+
let mut stage_table_info = plan.stage_table_info.clone();
151+
if plan.enable_schema_evolution {
152+
new_schema = Self::infer_schema(&mut stage_table_info, self.ctx.clone())
153+
.await
154+
.map_err(|e| e.with_context("infer_schema"))?;
155+
}
156+
157+
let stage_table = StageTable::try_create(stage_table_info)?;
126158

127159
let data_source_plan = stage_table
128160
.read_plan(self.ctx.clone(), None, None, false, false)
@@ -147,10 +179,19 @@ impl CopyIntoTableInterpreter {
147179
)
148180
};
149181

182+
let mut required_values_schema = plan.required_values_schema.clone();
183+
let mut required_source_schema = plan.required_source_schema.clone();
184+
if let Some(schema) = &new_schema {
185+
table_info.meta.schema = schema.clone();
186+
let data_schema: DataSchema = schema.into();
187+
required_source_schema = Arc::new(data_schema);
188+
required_values_schema = required_source_schema.clone();
189+
}
190+
150191
let mut root = PhysicalPlan::new(CopyIntoTable {
151-
required_values_schema: plan.required_values_schema.clone(),
192+
required_values_schema,
152193
values_consts: plan.values_consts.clone(),
153-
required_source_schema: plan.required_source_schema.clone(),
194+
required_source_schema,
154195
stage_table_info: plan.stage_table_info.clone(),
155196
table_info,
156197
write_mode: plan.write_mode,
@@ -176,7 +217,108 @@ impl CopyIntoTableInterpreter {
176217
let mut next_plan_id = 0;
177218
root.adjust_plan_id(&mut next_plan_id);
178219

179-
Ok((root, update_stream_meta_reqs))
220+
Ok((root, update_stream_meta_reqs, new_schema))
221+
}
222+
223+
async fn infer_schema(
224+
stage_table_info: &mut StageTableInfo,
225+
ctx: Arc<dyn TableContext>,
226+
) -> Result<Option<TableSchemaRef>> {
227+
#[allow(clippy::single_match)]
228+
match &stage_table_info.stage_info.file_format_params {
229+
FileFormatParams::Parquet(_) => {
230+
let settings = ctx.get_settings();
231+
let max_threads = settings.get_max_threads()? as usize;
232+
let max_memory_usage = settings.get_max_memory_usage()?;
233+
234+
let operator = init_stage_operator(&stage_table_info.stage_info)?;
235+
// User set the files.
236+
let files = stage_table_info.files_to_copy.as_ref().expect(
237+
"ParquetTableForCopy::do_read_partitions must be called with files_to_copy set",
238+
);
239+
let file_infos = files
240+
.iter()
241+
.filter(|f| f.size > 0)
242+
.map(|f| (f.path.clone(), f.size))
243+
.collect::<Vec<_>>();
244+
ctx.set_status_info("[TABLE-SCAN] Infer Parquet Schemas");
245+
let metas = read_metas_in_parallel_for_copy(
246+
&operator,
247+
&file_infos,
248+
max_threads,
249+
max_memory_usage,
250+
)
251+
.await?;
252+
253+
let case_sensitive = stage_table_info.copy_into_table_options.column_match_mode
254+
== Some(ColumnMatchMode::CaseSensitive);
255+
256+
let mut new_schema = stage_table_info.schema.as_ref().to_owned();
257+
let old_fields: HashMap<String, TableDataType> = stage_table_info
258+
.schema
259+
.fields
260+
.iter()
261+
.map(|f| {
262+
(
263+
if case_sensitive {
264+
f.name.clone()
265+
} else {
266+
f.name.to_lowercase()
267+
},
268+
f.data_type.clone(),
269+
)
270+
})
271+
.collect::<_>();
272+
let mut new_fields: HashMap<String, Field> = HashMap::new();
273+
for meta in &metas {
274+
let arrow_schema = infer_schema_with_extension(meta.meta.file_metadata())?;
275+
for field in arrow_schema.fields().clone().into_iter() {
276+
let name = if case_sensitive {
277+
field.name().clone()
278+
} else {
279+
field.name().to_lowercase()
280+
};
281+
if !old_fields.contains_key(&name) {
282+
if let Some(f) = new_fields.get_mut(&name) {
283+
if f.data_type() != field.data_type() {
284+
return Err(ErrorCode::BadBytes(format!(
285+
"data type of {name} mismatch: {} and {}",
286+
f.data_type(),
287+
field.data_type()
288+
)));
289+
}
290+
} else {
291+
new_fields.insert(name, field.as_ref().clone());
292+
}
293+
}
294+
}
295+
}
296+
297+
stage_table_info.parquet_metas = Some(metas);
298+
if new_fields.is_empty() {
299+
return Ok(None);
300+
} else {
301+
let new_fields: Vec<_> = new_fields.into_iter().sorted().collect();
302+
for (_, f) in new_fields {
303+
let mut tf: TableField = (&f).try_into()?;
304+
tf.data_type = tf.data_type.wrap_nullable();
305+
if let Some(exprs) = &mut stage_table_info.default_exprs {
306+
exprs.push(RemoteDefaultExpr::RemoteExpr(RemoteExpr::Constant {
307+
scalar: Scalar::Null,
308+
data_type: DataType::Null,
309+
span: Span::default(),
310+
}))
311+
}
312+
new_schema.add_column(&tf, new_schema.num_fields())?;
313+
}
314+
let schema = Arc::new(new_schema);
315+
stage_table_info.schema = schema.clone();
316+
return Ok(Some(schema));
317+
}
318+
}
319+
_ => {}
320+
}
321+
Ok(None)
180322
}
181323

182324
fn get_copy_into_table_result(&self) -> Result<Vec<DataBlock>> {
@@ -234,18 +376,39 @@ impl CopyIntoTableInterpreter {
234376
deduplicated_label: Option<String>,
235377
path_prefix: Option<String>,
236378
table_meta_timestamps: TableMetaTimestamps,
379+
new_schema: Option<TableSchemaRef>,
237380
) -> Result<()> {
238381
let ctx = self.ctx.clone();
239-
let to_table = ctx
382+
let mut to_table = ctx
240383
.get_table(
241384
plan.catalog_info.catalog_name(),
242385
&plan.database_name,
243386
&plan.table_name,
244387
)
245388
.await?;
246389

390+
let mut prev_snapshot_id = None;
391+
247392
// Commit.
248393
{
394+
let mut table_info = to_table.get_table_info().clone();
395+
if let Some(new_schema) = new_schema {
396+
let fuse_table = FuseTable::try_from_table(to_table.as_ref())?;
397+
let base_snapshot = fuse_table.read_table_snapshot().await?;
398+
prev_snapshot_id = base_snapshot.snapshot_id().map(|(id, _)| id);
399+
400+
table_info.meta.fill_field_comments();
401+
while table_info.meta.field_comments.len() < new_schema.fields.len() {
402+
table_info.meta.field_comments.push("".to_string());
403+
}
404+
table_info.meta.schema = new_schema;
405+
to_table = FuseTable::create_and_refresh_table_info(
406+
table_info,
407+
ctx.get_settings().get_s3_storage_class()?,
408+
)?
409+
.into();
410+
}
411+
249412
let copied_files_meta_req = PipelineBuilder::build_upsert_copied_files_to_meta_req(
250413
ctx.clone(),
251414
to_table.as_ref(),
@@ -260,7 +423,7 @@ impl CopyIntoTableInterpreter {
260423
copied_files_meta_req,
261424
update_stream_meta,
262425
plan.write_mode.is_overwrite(),
263-
None,
426+
prev_snapshot_id,
264427
deduplicated_label,
265428
table_meta_timestamps,
266429
)?;
@@ -371,7 +534,7 @@ impl Interpreter for CopyIntoTableInterpreter {
371534
.ctx
372535
.get_table_meta_timestamps(to_table.as_ref(), snapshot)?;
373536

374-
let (physical_plan, update_stream_meta) = self
537+
let (physical_plan, update_stream_meta, new_schema) = self
375538
.build_physical_plan(
376539
to_table.get_table_info().clone(),
377540
&self.plan,
@@ -403,6 +566,7 @@ impl Interpreter for CopyIntoTableInterpreter {
403566
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
404567
self.plan.path_prefix.clone(),
405568
table_meta_timestamps,
569+
new_schema,
406570
)
407571
.await?;
408572
}

src/query/service/src/interpreters/interpreter_replace.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ impl ReplaceInterpreter {
413413
Plan::CopyIntoTable(copy_plan) => {
414414
let interpreter =
415415
CopyIntoTableInterpreter::try_create(ctx.clone(), *copy_plan.clone())?;
416-
let (physical_plan, _) = interpreter
416+
let (physical_plan, _, _) = interpreter
417417
.build_physical_plan(table_info, &copy_plan, table_meta_timestamps)
418418
.await?;
419419

src/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ impl CreateTableInterpreter {
409409
if settings.get_copy_dedup_full_path_by_default()? {
410410
options.insert(
411411
OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH.to_string(),
412-
"1".to_string(),
412+
"true".to_string(),
413413
);
414414
};
415415

0 commit comments

Comments
 (0)