Skip to content

Commit 803c9e9

Browse files
authored
Introduce KipDB for persistence (#54)
* feat: `Tuple` implement serialization * feat: implement `TableCodec` * refactor: remove `TableCatalog::table_id` * refactor: remove `Storage::tables` * refactor: `Binder` and `Storage` are asynchronous * feat: Introduce `KipDB` for persistence * fix: repair `quit` command
1 parent 4fe2b6f commit 803c9e9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1348
-569
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ chrono = "0.4.26"
1818
tokio = { version = "1.28.2", features = ["full"] }
1919
serde = { version = "1", features = ["derive", "rc"] }
2020
serde_json = "1"
21+
bincode = "1.3.3"
2122
async-trait = "0.1.68"
2223
integer-encoding = "3.0.4"
2324
strum_macros = "0.24"
@@ -28,13 +29,17 @@ futures = "0.3.25"
2829
ahash = "0.8.3"
2930
lazy_static = "1.4.0"
3031
comfy-table = "7.0.1"
32+
bytes = "*"
33+
kip_db = "0.1.2-alpha.10"
34+
async-recursion = "1.0.5"
3135

3236
[dev-dependencies]
3337
tokio-test = "0.4.2"
3438
ctor = "0.2.0"
3539
env_logger = "0.10"
3640
paste = "^1.0"
3741
rstest = "0.17"
42+
tempfile = "3.0.7"
3843

3944
[workspace]
4045
members = [

src/binder/aggregate.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,28 +32,28 @@ impl Binder {
3232
Ok(())
3333
}
3434

35-
pub fn extract_group_by_aggregate(
35+
pub async fn extract_group_by_aggregate(
3636
&mut self,
3737
select_list: &mut [ScalarExpression],
3838
groupby: &[Expr],
3939
) -> Result<(), BindError> {
40-
self.validate_groupby_illegal_column(select_list, groupby)?;
40+
self.validate_groupby_illegal_column(select_list, groupby).await?;
4141

4242
for gb in groupby {
43-
let mut expr = self.bind_expr(gb)?;
43+
let mut expr = self.bind_expr(gb).await?;
4444
self.visit_group_by_expr(select_list, &mut expr);
4545
}
4646
Ok(())
4747
}
4848

49-
pub fn extract_having_orderby_aggregate(
49+
pub async fn extract_having_orderby_aggregate(
5050
&mut self,
5151
having: &Option<Expr>,
5252
orderbys: &[OrderByExpr],
5353
) -> Result<(Option<ScalarExpression>, Option<Vec<SortField>>), BindError> {
5454
// Extract having expression.
5555
let return_having = if let Some(having) = having {
56-
let mut having = self.bind_expr(having)?;
56+
let mut having = self.bind_expr(having).await?;
5757
self.visit_column_agg_expr(&mut having);
5858
Some(having)
5959
} else {
@@ -69,7 +69,7 @@ impl Binder {
6969
asc,
7070
nulls_first,
7171
} = orderby;
72-
let mut expr = self.bind_expr(expr)?;
72+
let mut expr = self.bind_expr(expr).await?;
7373
self.visit_column_agg_expr(&mut expr);
7474

7575
return_orderby.push(SortField::new(
@@ -135,14 +135,14 @@ impl Binder {
135135
/// e.g. SELECT a,count(b) FROM t GROUP BY a. it's ok.
136136
/// SELECT a,b FROM t GROUP BY a. it's error.
137137
/// SELECT a,count(b) FROM t GROUP BY b. it's error.
138-
fn validate_groupby_illegal_column(
138+
async fn validate_groupby_illegal_column(
139139
&mut self,
140140
select_items: &[ScalarExpression],
141141
groupby: &[Expr],
142142
) -> Result<(), BindError> {
143143
let mut group_raw_exprs = vec![];
144144
for expr in groupby {
145-
let expr = self.bind_expr(expr)?;
145+
let expr = self.bind_expr(expr).await?;
146146
if let ScalarExpression::Alias { alias, .. } = expr {
147147
let alias_expr = select_items.iter().find(|column| {
148148
if let ScalarExpression::Alias {

src/binder/create_table.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use std::collections::HashSet;
22
use std::sync::Arc;
3+
use itertools::Itertools;
34
use sqlparser::ast::{ColumnDef, ObjectName};
45

56
use super::Binder;
67
use crate::binder::{BindError, lower_case_name, split_name};
7-
use crate::catalog::{ColumnCatalog, ColumnRef};
8+
use crate::catalog::ColumnCatalog;
89
use crate::planner::LogicalPlan;
910
use crate::planner::operator::create_table::CreateTableOperator;
1011
use crate::planner::operator::Operator;
@@ -16,7 +17,8 @@ impl Binder {
1617
columns: &[ColumnDef],
1718
) -> Result<LogicalPlan, BindError> {
1819
let name = lower_case_name(&name);
19-
let (_, table_name) = split_name(&name)?;
20+
let (_, name) = split_name(&name)?;
21+
let table_name = Arc::new(name.to_string());
2022

2123
// check duplicated column names
2224
let mut set = HashSet::new();
@@ -27,15 +29,15 @@ impl Binder {
2729
}
2830
}
2931

30-
let columns: Vec<ColumnRef> = columns
32+
let columns = columns
3133
.iter()
32-
.map(|col| Arc::new(ColumnCatalog::from(col.clone())))
33-
.collect();
34+
.map(|col| ColumnCatalog::from(col.clone()))
35+
.collect_vec();
3436

3537
let plan = LogicalPlan {
3638
operator: Operator::CreateTable(
3739
CreateTableOperator {
38-
table_name: table_name.to_string(),
40+
table_name,
3941
columns
4042
}
4143
),
@@ -47,21 +49,26 @@ impl Binder {
4749

4850
#[cfg(test)]
4951
mod tests {
52+
use tempfile::TempDir;
5053
use super::*;
5154
use crate::binder::BinderContext;
52-
use crate::catalog::{ColumnDesc, RootCatalog};
55+
use crate::catalog::ColumnDesc;
56+
use crate::storage::kip::KipStorage;
5357
use crate::types::LogicalType;
5458

55-
#[test]
56-
fn test_create_bind() {
59+
#[tokio::test]
60+
async fn test_create_bind() {
61+
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
62+
let storage = KipStorage::new(temp_dir.path()).await.unwrap();
63+
5764
let sql = "create table t1 (id int , name varchar(10) null)";
58-
let binder = Binder::new(BinderContext::new(RootCatalog::new()));
65+
let binder = Binder::new(BinderContext::new(storage));
5966
let stmt = crate::parser::parse_sql(sql).unwrap();
60-
let plan1 = binder.bind(&stmt[0]).unwrap();
67+
let plan1 = binder.bind(&stmt[0]).await.unwrap();
6168

6269
match plan1.operator {
6370
Operator::CreateTable(op) => {
64-
assert_eq!(op.table_name, "t1".to_string());
71+
assert_eq!(op.table_name, Arc::new("t1".to_string()));
6572
assert_eq!(op.columns[0].name, "id".to_string());
6673
assert_eq!(op.columns[0].nullable, false);
6774
assert_eq!(op.columns[0].desc, ColumnDesc::new(LogicalType::Integer, false));

src/binder/expr.rs

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,37 @@ use itertools::Itertools;
33
use sqlparser::ast::{BinaryOperator, Expr, Function, FunctionArg, FunctionArgExpr, Ident};
44
use std::slice;
55
use std::sync::Arc;
6+
use async_recursion::async_recursion;
67
use crate::expression::agg::AggKind;
78

89
use super::Binder;
910
use crate::expression::ScalarExpression;
11+
use crate::storage::Storage;
1012
use crate::types::LogicalType;
1113

1214
impl Binder {
13-
pub(crate) fn bind_expr(&mut self, expr: &Expr) -> Result<ScalarExpression, BindError> {
15+
#[async_recursion]
16+
pub(crate) async fn bind_expr(&mut self, expr: &Expr) -> Result<ScalarExpression, BindError> {
1417
match expr {
1518
Expr::Identifier(ident) => {
16-
self.bind_column_ref_from_identifiers(slice::from_ref(ident), None)
19+
self.bind_column_ref_from_identifiers(slice::from_ref(ident), None).await
1720
}
1821
Expr::CompoundIdentifier(idents) => {
19-
self.bind_column_ref_from_identifiers(idents, None)
22+
self.bind_column_ref_from_identifiers(idents, None).await
2023
}
2124
Expr::BinaryOp { left, right, op} => {
22-
self.bind_binary_op_internal(left, right, op)
25+
self.bind_binary_op_internal(left, right, op).await
2326
}
2427
Expr::Value(v) => Ok(ScalarExpression::Constant(Arc::new(v.into()))),
25-
Expr::Function(func) => self.bind_agg_call(func),
26-
Expr::Nested(expr) => self.bind_expr(expr),
28+
Expr::Function(func) => self.bind_agg_call(func).await,
29+
Expr::Nested(expr) => self.bind_expr(expr).await,
2730
_ => {
2831
todo!()
2932
}
3033
}
3134
}
3235

33-
pub fn bind_column_ref_from_identifiers(
36+
pub async fn bind_column_ref_from_identifiers(
3437
&mut self,
3538
idents: &[Ident],
3639
bind_table_name: Option<&String>,
@@ -58,8 +61,9 @@ impl Binder {
5861
if let Some(table) = table_name.or(bind_table_name) {
5962
let table_catalog = self
6063
.context
61-
.catalog
62-
.get_table_by_name(table)
64+
.storage
65+
.table_catalog(table)
66+
.await
6367
.ok_or_else(|| BindError::InvalidTable(table.to_string()))?;
6468

6569
let column_catalog = table_catalog
@@ -69,7 +73,7 @@ impl Binder {
6973
} else {
7074
// handle col syntax
7175
let mut got_column = None;
72-
for (_, table_catalog) in self.context.catalog.tables() {
76+
for (_, (table_catalog, _)) in &self.context.bind_table {
7377
if let Some(column_catalog) = table_catalog.get_column_by_name(column_name) {
7478
if got_column.is_some() {
7579
return Err(BindError::InvalidColumn(column_name.to_string()).into());
@@ -88,14 +92,14 @@ impl Binder {
8892
}
8993
}
9094

91-
fn bind_binary_op_internal(
95+
async fn bind_binary_op_internal(
9296
&mut self,
9397
left: &Expr,
9498
right: &Expr,
9599
op: &BinaryOperator,
96100
) -> Result<ScalarExpression, BindError> {
97-
let left_expr = Box::new(self.bind_expr(left)?);
98-
let right_expr = Box::new(self.bind_expr(right)?);
101+
let left_expr = Box::new(self.bind_expr(left).await?);
102+
let right_expr = Box::new(self.bind_expr(right).await?);
99103

100104
let ty = match op {
101105
BinaryOperator::Plus | BinaryOperator::Minus | BinaryOperator::Multiply |
@@ -121,20 +125,19 @@ impl Binder {
121125
})
122126
}
123127

124-
fn bind_agg_call(&mut self, func: &Function) -> Result<ScalarExpression, BindError> {
125-
let args: Vec<ScalarExpression> = func.args
126-
.iter()
127-
.map(|arg| {
128-
let arg_expr = match arg {
129-
FunctionArg::Named { arg, .. } => arg,
130-
FunctionArg::Unnamed(arg) => arg,
131-
};
132-
match arg_expr {
133-
FunctionArgExpr::Expr(expr) => self.bind_expr(expr),
134-
_ => todo!()
135-
}
136-
})
137-
.try_collect()?;
128+
async fn bind_agg_call(&mut self, func: &Function) -> Result<ScalarExpression, BindError> {
129+
let mut args = Vec::with_capacity(func.args.len());
130+
131+
for arg in func.args.iter() {
132+
let arg_expr = match arg {
133+
FunctionArg::Named { arg, .. } => arg,
134+
FunctionArg::Unnamed(arg) => arg,
135+
};
136+
match arg_expr {
137+
FunctionArgExpr::Expr(expr) => args.push(self.bind_expr(expr).await?),
138+
_ => todo!()
139+
}
140+
}
138141
let ty = args[0].return_type();
139142

140143
Ok(match func.name.to_string().to_lowercase().as_str() {

src/binder/insert.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,64 @@
11
use std::slice;
2+
use std::sync::Arc;
23
use sqlparser::ast::{Expr, Ident, ObjectName};
3-
use itertools::Itertools;
44
use crate::binder::{Binder, BindError, lower_case_name, split_name};
55
use crate::catalog::ColumnRef;
66
use crate::expression::ScalarExpression;
77
use crate::planner::LogicalPlan;
88
use crate::planner::operator::insert::InsertOperator;
99
use crate::planner::operator::Operator;
1010
use crate::planner::operator::values::ValuesOperator;
11+
use crate::storage::Storage;
1112
use crate::types::value::ValueRef;
1213

1314
impl Binder {
14-
pub(crate) fn bind_insert(
15+
pub(crate) async fn bind_insert(
1516
&mut self,
1617
name: ObjectName,
1718
idents: &[Ident],
18-
rows: &Vec<Vec<Expr>>
19+
expr_rows: &Vec<Vec<Expr>>
1920
) -> Result<LogicalPlan, BindError> {
2021
let name = lower_case_name(&name);
21-
let (_, table_name) = split_name(&name)?;
22+
let (_, name) = split_name(&name)?;
23+
let table_name = Arc::new(name.to_string());
2224

23-
if let Some(table) = self.context.catalog.get_table_by_name(table_name) {
24-
let table_id = table.id;
25+
if let Some(table) = self.context.storage.table_catalog(&table_name).await {
2526
let mut columns = Vec::new();
2627

2728
if idents.is_empty() {
28-
columns = table.all_columns()
29-
.into_iter()
30-
.map(|(_, catalog)| catalog.clone())
31-
.collect_vec();
29+
columns = table.all_columns();
3230
} else {
3331
let bind_table_name = Some(table_name.to_string());
3432
for ident in idents {
3533
match self.bind_column_ref_from_identifiers(
3634
slice::from_ref(ident),
3735
bind_table_name.as_ref()
38-
)? {
36+
).await? {
3937
ScalarExpression::ColumnRef(catalog) => columns.push(catalog),
4038
_ => unreachable!()
4139
}
4240
}
4341
}
42+
let mut rows = Vec::with_capacity(expr_rows.len());
4443

45-
let rows = rows
46-
.into_iter()
47-
.map(|row| {
48-
row.into_iter()
49-
.map(|expr| match self.bind_expr(expr)? {
50-
ScalarExpression::Constant(value) => Ok::<ValueRef, BindError>(value),
51-
_ => unreachable!(),
52-
})
53-
.try_collect()
54-
})
55-
.try_collect()?;
44+
for expr_row in expr_rows {
45+
let mut row = Vec::with_capacity(expr_row.len());
5646

47+
for expr in expr_row {
48+
match self.bind_expr(expr).await? {
49+
ScalarExpression::Constant(value) => row.push(value),
50+
_ => unreachable!(),
51+
}
52+
}
53+
54+
rows.push(row);
55+
}
5756
let values_plan = self.bind_values(rows, columns);
5857

5958
Ok(LogicalPlan {
6059
operator: Operator::Insert(
6160
InsertOperator {
62-
table_id,
61+
table_name,
6362
}
6463
),
6564
childrens: vec![values_plan],

0 commit comments

Comments
 (0)