Skip to content

Commit de224d9

Browse files
committed
feat: supports multiple primary key types
- Tinyint - UTinyint - Smallint - USmallint - Integer - UInteger - Bigint - UBigint - Varchar
1 parent 93ca5d7 commit de224d9

File tree

13 files changed

+139
-124
lines changed

13 files changed

+139
-124
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ Storage Support:
4343
![demo](./static/images/demo.png)
4444

4545
### Features
46+
- Supports multiple primary key types
47+
- Tinyint
48+
- UTinyint
49+
- Smallint
50+
- USmallint
51+
- Integer
52+
- UInteger
53+
- Bigint
54+
- UBigint
55+
- Varchar
4656
- DDL
4757
- Create
4858
- [x] Table
@@ -82,6 +92,7 @@ Storage Support:
8292
- Float
8393
- Double
8494
- Varchar
95+
- Date
8596
- DateTime
8697
- Optimizer rules
8798
- Limit Project Transpose

src/binder/create_table.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use crate::planner::LogicalPlan;
1010
use crate::planner::operator::create_table::CreateTableOperator;
1111
use crate::planner::operator::Operator;
1212
use crate::storage::Storage;
13-
use crate::types::LogicalType;
1413

1514
impl<S: Storage> Binder<S> {
1615
pub(crate) fn bind_create_table(
@@ -30,17 +29,12 @@ impl<S: Storage> Binder<S> {
3029
return Err(BindError::AmbiguousColumn(col_name.to_string()));
3130
}
3231
}
33-
3432
let columns = columns
3533
.iter()
3634
.map(|col| ColumnCatalog::from(col.clone()))
3735
.collect_vec();
3836

39-
if let Some(col) = columns.iter().find(|col| col.desc.is_primary) {
40-
if !matches!(col.datatype(), LogicalType::Integer) {
41-
return Err(BindError::InvalidColumn("Primary key types only support signed integers".to_string()));
42-
}
43-
} else {
37+
if columns.iter().find(|col| col.desc.is_primary).is_none() {
4438
return Err(BindError::InvalidTable("At least one primary key field exists".to_string()));
4539
}
4640

src/execution/executor/dml/insert.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::execution::executor::{BoxedExecutor, Executor};
77
use crate::execution::ExecutorError;
88
use crate::planner::operator::insert::InsertOperator;
99
use crate::storage::{Storage, Table};
10-
use crate::types::{ColumnId, LogicalType};
10+
use crate::types::ColumnId;
1111
use crate::types::tuple::Tuple;
1212
use crate::types::value::{DataValue, ValueRef};
1313

@@ -49,25 +49,16 @@ impl Insert {
4949
.map(|(i, _)| i)
5050
.unwrap()
5151
});
52-
53-
let tuple_id = if let DataValue::Int64(Some(primary_id)) =
54-
DataValue::clone(&values[*primary_idx]).cast(&LogicalType::Bigint)?
55-
{
56-
primary_id
57-
} else {
58-
unreachable!("Primary key must have a value")
59-
};
60-
52+
let id = Some(values[*primary_idx].clone());
6153
let mut tuple_map: HashMap<ColumnId, ValueRef> = values
6254
.into_iter()
6355
.enumerate()
6456
.map(|(i, value)| (columns[i].id, value))
6557
.collect();
66-
6758
let all_columns = table_catalog.all_columns_with_id();
6859

6960
let mut tuple = Tuple {
70-
id: Some(tuple_id),
61+
id,
7162
columns: Vec::with_capacity(all_columns.len()),
7263
values: Vec::with_capacity(all_columns.len()),
7364
};

src/execution/executor/dml/update.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,19 @@ impl Update {
4545
value_map.insert(columns[i].id, values[i].clone());
4646
}
4747
}
48-
4948
#[for_await]
5049
for tuple in input {
5150
let mut tuple = tuple?;
5251

5352
for (i, column) in tuple.columns.iter().enumerate() {
5453
if let Some(value) = value_map.get(&column.id) {
54+
if column.desc.is_primary {
55+
if let Some(old_key) = tuple.id.replace(value.clone()) {
56+
if value != &old_key {
57+
table.delete(old_key)?;
58+
}
59+
}
60+
}
5561
tuple.values[i] = value.clone();
5662
}
5763
}

src/execution/executor/dql/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub struct Filter {
1313
}
1414

1515
impl From<(FilterOperator, BoxedExecutor)> for Filter {
16-
fn from((FilterOperator { predicate, having }, input): (FilterOperator, BoxedExecutor)) -> Self {
16+
fn from((FilterOperator { predicate, .. }, input): (FilterOperator, BoxedExecutor)) -> Self {
1717
Filter {
1818
predicate,
1919
input

src/execution/executor/dql/seq_scan.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ impl<S: Storage> Executor<S> for SeqScan {
2626
impl SeqScan {
2727
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
2828
pub async fn _execute<S: Storage>(self, storage: S) {
29-
// TODO: sort_fields, pre_where, limit
3029
let ScanOperator { table_name, columns, limit, .. } = self.op;
3130

3231
if let Some(table) = storage.table(&table_name).await {

src/planner/operator/scan.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ use super::{sort::SortField, Operator};
1010
pub struct ScanOperator {
1111
pub table_name: TableName,
1212
pub columns: Vec<ScalarExpression>,
13-
// TODO:
13+
// Support push down limit.
14+
pub limit: Bounds,
15+
16+
// IndexScan only
1417
pub sort_fields: Vec<SortField>,
18+
// IndexScan only
1519
// Support push down predicate.
1620
// If pre_where is simple predicate, for example: a > 1 then can calculate directly when read data.
17-
// TODO:
1821
pub pre_where: Vec<ScalarExpression>,
19-
// Support push down limit.
20-
pub limit: Bounds,
2122
}
2223
impl ScanOperator {
2324
pub fn new(table_name: TableName, table_catalog: &TableCatalog) -> LogicalPlan {

src/storage/kip.rs

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -164,14 +164,14 @@ impl Table for KipTable {
164164
}
165165

166166
fn append(&mut self, tuple: Tuple) -> Result<(), StorageError> {
167-
let (key, value) = self.table_codec.encode_tuple(&tuple);
167+
let (key, value) = self.table_codec.encode_tuple(&tuple)?;
168168
self.tx.set(key, value);
169169

170170
Ok(())
171171
}
172172

173173
fn delete(&mut self, tuple_id: TupleId) -> Result<(), StorageError> {
174-
let key = self.table_codec.encode_tuple_key(&tuple_id);
174+
let key = self.table_codec.encode_tuple_key(&tuple_id)?;
175175
self.tx.remove(&key)?;
176176

177177
Ok(())
@@ -205,33 +205,31 @@ impl Transaction for KipTraction<'_> {
205205
}
206206
}
207207

208-
self.iter
209-
.try_next()?
210-
.and_then(|(key, bytes)| {
211-
bytes.and_then(|value| {
212-
self.table_codec.decode_tuple(&key, &value)
213-
.map(|tuple| {
214-
let projection_len = self.projections.len();
215-
216-
let mut columns = Vec::with_capacity(projection_len);
217-
let mut values = Vec::with_capacity(projection_len);
218-
219-
for expr in self.projections.iter() {
220-
values.push(expr.eval_column(&tuple)?);
221-
columns.push(expr.output_columns(&tuple));
222-
}
223-
224-
self.limit = self.limit.map(|num| num - 1);
225-
226-
Ok(Tuple {
227-
id: tuple.id,
228-
columns,
229-
values,
230-
})
231-
})
232-
})
233-
})
234-
.transpose()
208+
while let Some(item) = self.iter.try_next()? {
209+
if let (_, Some(value)) = item {
210+
let tuple = self.table_codec.decode_tuple(&value);
211+
212+
let projection_len = self.projections.len();
213+
214+
let mut columns = Vec::with_capacity(projection_len);
215+
let mut values = Vec::with_capacity(projection_len);
216+
217+
for expr in self.projections.iter() {
218+
values.push(expr.eval_column(&tuple)?);
219+
columns.push(expr.output_columns(&tuple));
220+
}
221+
222+
self.limit = self.limit.map(|num| num - 1);
223+
224+
return Ok(Some(Tuple {
225+
id: tuple.id,
226+
columns,
227+
values,
228+
}))
229+
}
230+
}
231+
232+
Ok(None)
235233
}
236234
}
237235

@@ -246,6 +244,7 @@ mod test {
246244
use crate::storage::{Storage, StorageError, Transaction, Table};
247245
use crate::storage::memory::test::data_filling;
248246
use crate::types::LogicalType;
247+
use crate::types::value::DataValue;
249248

250249
#[tokio::test]
251250
async fn test_in_kipdb_storage_works_with_data() -> Result<(), StorageError> {
@@ -282,7 +281,7 @@ mod test {
282281
)?;
283282

284283
let option_1 = tx.next_tuple()?;
285-
assert_eq!(option_1.unwrap().id, Some(1));
284+
assert_eq!(option_1.unwrap().id, Some(Arc::new(DataValue::Int32(Some(2)))));
286285

287286
let option_2 = tx.next_tuple()?;
288287
assert_eq!(option_2, None);

src/storage/memory.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ impl Table for MemTable {
179179
.as_mut()
180180
}.unwrap();
181181

182-
tuples.retain(|tuple| tuple.id.unwrap() != tuple_id);
182+
tuples.retain(|tuple| tuple.id.clone().unwrap() != tuple_id);
183183

184184
Ok(())
185185
}
@@ -249,15 +249,15 @@ pub(crate) mod test {
249249

250250
pub fn data_filling(columns: Vec<ColumnRef>, table: &mut impl Table) -> Result<(), StorageError> {
251251
table.append(Tuple {
252-
id: Some(0),
252+
id: Some(Arc::new(DataValue::Int32(Some(1)))),
253253
columns: columns.clone(),
254254
values: vec![
255255
Arc::new(DataValue::Int32(Some(1))),
256256
Arc::new(DataValue::Boolean(Some(true)))
257257
],
258258
})?;
259259
table.append(Tuple {
260-
id: Some(1),
260+
id: Some(Arc::new(DataValue::Int32(Some(2)))),
261261
columns: columns.clone(),
262262
values: vec![
263263
Arc::new(DataValue::Int32(Some(2))),
@@ -303,7 +303,7 @@ pub(crate) mod test {
303303
)?;
304304

305305
let option_1 = tx.next_tuple()?;
306-
assert_eq!(option_1.unwrap().id, Some(1));
306+
assert_eq!(option_1.unwrap().id, Some(Arc::new(DataValue::Int32(Some(2)))));
307307

308308
let option_2 = tx.next_tuple()?;
309309
assert_eq!(option_2, None);

0 commit comments

Comments
 (0)