Skip to content
2 changes: 2 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ build_exceptions! {
DatabaseAlreadyExists(2301),
/// Table already exists
TableAlreadyExists(2302),
/// Database version mismatch
DatabaseVersionMismatched(2303),
/// View already exists
ViewAlreadyExists(2306),
/// Create table with drop time
Expand Down
63 changes: 63 additions & 0 deletions src/meta/api/src/database_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use chrono::Utc;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::app_error::CreateDatabaseWithDropTime;
use databend_common_meta_app::app_error::DatabaseAlreadyExists;
use databend_common_meta_app::app_error::DatabaseVersionMismatched;
use databend_common_meta_app::app_error::UndropDbHasNoHistory;
use databend_common_meta_app::app_error::UndropDbWithNoDropTime;
use databend_common_meta_app::app_error::UnknownDatabase;
Expand All @@ -43,10 +44,13 @@ use databend_common_meta_app::schema::RenameDatabaseReply;
use databend_common_meta_app::schema::RenameDatabaseReq;
use databend_common_meta_app::schema::UndropDatabaseReply;
use databend_common_meta_app::schema::UndropDatabaseReq;
use databend_common_meta_app::schema::UpdateDatabaseOptionsReply;
use databend_common_meta_app::schema::UpdateDatabaseOptionsReq;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_types::ConditionResult::Eq;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::SeqV;
Expand All @@ -65,6 +69,7 @@ use crate::error_util::db_has_to_not_exist;
use crate::fetch_id;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_api::UpsertPB;
use crate::serialize_struct;
use crate::serialize_u64;
use crate::txn_backoff::txn_backoff;
Expand Down Expand Up @@ -494,6 +499,64 @@ where
}
}

#[logcall::logcall]
#[fastrace::trace]
async fn update_database_options(
&self,
req: UpdateDatabaseOptionsReq,
) -> Result<UpdateDatabaseOptionsReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let db_id = req.db_id;
let expected_seq = req.expected_meta_seq;
let new_options = req.options.clone();
let db_key = DatabaseId::new(db_id);

let seq_meta = self.get_pb(&db_key).await?;
let Some(seq_meta) = seq_meta else {
return Err(KVAppError::AppError(AppError::UnknownDatabaseId(
UnknownDatabaseId::new(db_id, "update_database_options"),
)));
};

if seq_meta.seq != expected_seq {
return Err(KVAppError::AppError(AppError::DatabaseVersionMismatched(
DatabaseVersionMismatched::new(
db_id,
MatchSeq::Exact(expected_seq),
seq_meta.seq,
"update_database_options",
),
)));
}

let mut meta = seq_meta.data;
meta.options = new_options;
meta.updated_on = Utc::now();

let upsert = UpsertPB::update_exact(db_key, SeqV::new(expected_seq, meta));
let transition = self.upsert_pb(&upsert).await?;

if !transition.is_changed() {
let curr_seq = self
.get_pb(&db_key)
.await?
.map(|v| v.seq())
.unwrap_or_default();

return Err(KVAppError::AppError(AppError::DatabaseVersionMismatched(
DatabaseVersionMismatched::new(
db_id,
MatchSeq::Exact(expected_seq),
curr_seq,
"update_database_options",
),
)));
}

Ok(UpdateDatabaseOptionsReply {})
}

#[logcall::logcall]
#[fastrace::trace]
async fn get_database(&self, req: GetDatabaseReq) -> Result<Arc<DatabaseInfo>, KVAppError> {
Expand Down
28 changes: 28 additions & 0 deletions src/meta/app/src/app_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,26 @@ impl TableVersionMismatched {
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("DatabaseVersionMismatched: {db_id} expect `{expect}` but `{curr}` while `{context}`")]
pub struct DatabaseVersionMismatched {
db_id: u64,
expect: MatchSeq,
curr: u64,
context: String,
}

impl DatabaseVersionMismatched {
pub fn new(db_id: u64, expect: MatchSeq, curr: u64, context: impl Into<String>) -> Self {
Self {
db_id,
expect,
curr,
context: context.into(),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("StreamAlreadyExists: {name} while {context}")]
pub struct StreamAlreadyExists {
Expand Down Expand Up @@ -1010,6 +1030,9 @@ pub enum AppError {
#[error(transparent)]
TableVersionMismatched(#[from] TableVersionMismatched),

#[error(transparent)]
DatabaseVersionMismatched(#[from] DatabaseVersionMismatched),

#[error(transparent)]
DuplicatedUpsertFiles(#[from] DuplicatedUpsertFiles),

Expand Down Expand Up @@ -1293,6 +1316,8 @@ impl AppErrorMessage for UnknownDatabaseId {}

impl AppErrorMessage for TableVersionMismatched {}

impl AppErrorMessage for DatabaseVersionMismatched {}

impl AppErrorMessage for StreamAlreadyExists {
fn message(&self) -> String {
format!("'{}' as stream Already Exists", self.name)
Expand Down Expand Up @@ -1653,6 +1678,9 @@ impl From<AppError> for ErrorCode {
AppError::UndropTableHasNoHistory(err) => {
ErrorCode::UndropTableHasNoHistory(err.message())
}
AppError::DatabaseVersionMismatched(err) => {
ErrorCode::DatabaseVersionMismatched(err.message())
}
AppError::TableVersionMismatched(err) => {
ErrorCode::TableVersionMismatched(err.message())
}
Expand Down
23 changes: 23 additions & 0 deletions src/meta/app/src/schema/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,29 @@ pub struct DropDatabaseReply {
pub db_id: u64,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UpdateDatabaseOptionsReq {
pub db_id: u64,
/// The database meta sequence the caller observed. Used for CAS semantics.
pub expected_meta_seq: u64,
/// The complete option map that should replace the existing options when the
/// expected meta sequence still matches.
pub options: BTreeMap<String, String>,
}

impl Display for UpdateDatabaseOptionsReq {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"update_db_options:{}@{}={:?}",
self.db_id, self.expected_meta_seq, self.options
)
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UpdateDatabaseOptionsReply {}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UndropDatabaseReq {
pub name_ident: DatabaseNameIdent,
Expand Down
2 changes: 2 additions & 0 deletions src/meta/app/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub use database::RenameDatabaseReq;
pub use database::ShareDbId;
pub use database::UndropDatabaseReply;
pub use database::UndropDatabaseReq;
pub use database::UpdateDatabaseOptionsReply;
pub use database::UpdateDatabaseOptionsReq;
pub use database_id::DatabaseId;
pub use database_id_history_ident::DatabaseIdHistoryIdent;
pub use dictionary::*;
Expand Down
23 changes: 22 additions & 1 deletion src/query/ast/src/ast/statements/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,17 @@ impl Display for CreateDatabaseStmt {
write!(f, " ENGINE = {engine}")?;
}

// TODO(leiysky): display rest information
if !self.options.is_empty() {
write!(f, " OPTIONS (")?;
for (i, option) in self.options.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{} = '{}'", option.name, option.value)?;
}
write!(f, ")")?;
}

Ok(())
}
}
Expand Down Expand Up @@ -169,6 +179,16 @@ impl Display for AlterDatabaseStmt {
AlterDatabaseAction::RefreshDatabaseCache => {
write!(f, " REFRESH CACHE")?;
}
AlterDatabaseAction::SetOptions { options } => {
write!(f, " SET OPTIONS (")?;
for (i, option) in options.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{} = '{}'", option.name, option.value)?;
}
write!(f, ")")?;
}
}

Ok(())
Expand All @@ -179,6 +199,7 @@ impl Display for AlterDatabaseStmt {
pub enum AlterDatabaseAction {
RenameDatabase { new_db: Identifier },
RefreshDatabaseCache,
SetOptions { options: Vec<SQLProperty> },
}

#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)]
Expand Down
65 changes: 46 additions & 19 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub type ShareDatabaseParams = (ShareNameIdent, Identifier);
#[derive(Clone)]
pub enum CreateDatabaseOption {
DatabaseEngine(DatabaseEngine),
Options(Vec<SQLProperty>),
}

fn procedure_type_name(i: Input) -> IResult<Vec<TypeName>> {
Expand Down Expand Up @@ -799,28 +800,24 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
~ ( DATABASE | SCHEMA )
~ ( IF ~ ^NOT ~ ^EXISTS )?
~ #database_ref
~ #create_database_option?
~ ( ENGINE ~ ^"=" ~ ^#database_engine )?
~ ( OPTIONS ~ ^"(" ~ ^#sql_property_list ~ ^")" )?
},
|(_, opt_or_replace, _, opt_if_not_exists, database, create_database_option)| {
|(_, opt_or_replace, _, opt_if_not_exists, database, engine_opt, options_opt)| {
let create_option =
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;

let statement = match create_database_option {
Some(CreateDatabaseOption::DatabaseEngine(engine)) => {
Statement::CreateDatabase(CreateDatabaseStmt {
create_option,
database,
engine: Some(engine),
options: vec![],
})
}
None => Statement::CreateDatabase(CreateDatabaseStmt {
create_option,
database,
engine: None,
options: vec![],
}),
};
let engine = engine_opt.map(|(_, _, engine)| engine);
let options = options_opt
.map(|(_, _, options, _)| options)
.unwrap_or_default();

let statement = Statement::CreateDatabase(CreateDatabaseStmt {
create_option,
database,
engine,
options,
});

Ok(statement)
},
Expand Down Expand Up @@ -4253,9 +4250,17 @@ pub fn alter_database_action(i: Input) -> IResult<AlterDatabaseAction> {
|(_, _)| AlterDatabaseAction::RefreshDatabaseCache,
);

let set_options = map(
rule! {
SET ~ OPTIONS ~ "(" ~ #sql_property_list ~ ")"
},
|(_, _, _, options, _)| AlterDatabaseAction::SetOptions { options },
);

rule!(
#rename_database
| #refresh_cache
| #set_options
)
.parse(i)
}
Expand Down Expand Up @@ -5218,19 +5223,41 @@ pub fn database_engine(i: Input) -> IResult<DatabaseEngine> {
}

pub fn create_database_option(i: Input) -> IResult<CreateDatabaseOption> {
let mut create_db_engine = parser_fn(map(
let create_db_engine = parser_fn(map(
rule! {
ENGINE ~ ^"=" ~ ^#database_engine
},
|(_, _, option)| CreateDatabaseOption::DatabaseEngine(option),
));

let create_db_options = map(
rule! {
OPTIONS ~ "(" ~ #sql_property_list ~ ")"
},
|(_, _, options, _)| CreateDatabaseOption::Options(options),
);

rule!(
#create_db_engine
| #create_db_options
)
.parse(i)
}

pub fn sql_property_list(i: Input) -> IResult<Vec<SQLProperty>> {
let property = map(
rule! {
#ident ~ "=" ~ #option_to_string
},
|(name, _, value)| SQLProperty {
name: name.name,
value,
},
);

comma_separated_list1(property).parse(i)
}

pub fn catalog_type(i: Input) -> IResult<CatalogType> {
alt((
value(CatalogType::Default, rule! { DEFAULT }),
Expand Down
3 changes: 3 additions & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ fn test_statement() {
r#"create database if not exists a;"#,
r#"create database ctl.t engine = Default;"#,
r#"create database t engine = Default;"#,
r#"create database test_db OPTIONS (DEFAULT_STORAGE_CONNECTION = 'my_conn', DEFAULT_STORAGE_PATH = 's3://bucket/path');"#,
r#"create database mydb ENGINE = DEFAULT OPTIONS (DEFAULT_STORAGE_CONNECTION = 'test_conn', DEFAULT_STORAGE_PATH = 's3://test/path');"#,
r#"CREATE TABLE `t3`(a int not null, b int not null, c int not null) bloom_index_columns='a,b,c' COMPRESSION='zstd' STORAGE_FORMAT='native';"#,
r#"create or replace database a;"#,
r#"drop database ctl.t;"#,
Expand All @@ -170,6 +172,7 @@ fn test_statement() {
r#"create view v1(c1) as select number % 3 as a from numbers(1000);"#,
r#"create or replace view v1(c1) as select number % 3 as a from numbers(1000);"#,
r#"alter view v1(c2) as select number % 3 as a from numbers(1000);"#,
r#"alter database test_db SET OPTIONS (DEFAULT_STORAGE_CONNECTION = 'updated_conn');"#,
r#"show views"#,
r#"show views format TabSeparatedWithNamesAndTypes;"#,
r#"show full views"#,
Expand Down
2 changes: 1 addition & 1 deletion src/query/ast/tests/it/testdata/stmt-error.txt
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ error:
--> SQL:1:23
|
1 | alter database system x rename to db
| ----- ^ unexpected `x`, expecting `RENAME`, `REFRESH`, or `.`
| ----- ^ unexpected `x`, expecting `RENAME`, `REFRESH`, `SET`, or `.`
| |
| while parsing `ALTER DATABASE [IF EXISTS] <action>`

Expand Down
Loading
Loading