Skip to content

Commit 9a05ed8

Browse files
authored
fix(value): DataValue::to_primary_key has an order reversal problem… (#59)
* fix(value): `DataValue::to_primary_key` has an order reversal problem when the signed integer is negative. e.g. will happen -9 < -100 * feat: supports `UnaryOperator` * feat: supports `Insert Overwrite` * invalid: Throw the failure of data conversion in the `DataValue::cast`
1 parent de224d9 commit 9a05ed8

File tree

17 files changed

+273
-69
lines changed

17 files changed

+273
-69
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ Storage Support:
7575
- [x] Limit
7676
- DML
7777
- [x] Insert
78+
- [x] Insert Overwrite
7879
- [x] Update
7980
- [x] Delete
8081
- DataTypes

src/binder/expr.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::binder::BindError;
22
use itertools::Itertools;
3-
use sqlparser::ast::{BinaryOperator, Expr, Function, FunctionArg, FunctionArgExpr, Ident};
3+
use sqlparser::ast::{BinaryOperator, Expr, Function, FunctionArg, FunctionArgExpr, Ident, UnaryOperator};
44
use std::slice;
55
use std::sync::Arc;
66
use async_recursion::async_recursion;
@@ -28,6 +28,7 @@ impl<S: Storage> Binder<S> {
2828
Expr::Value(v) => Ok(ScalarExpression::Constant(Arc::new(v.into()))),
2929
Expr::Function(func) => self.bind_agg_call(func).await,
3030
Expr::Nested(expr) => self.bind_expr(expr).await,
31+
Expr::UnaryOp { expr, op } => self.bind_unary_op_internal(expr, op).await,
3132
_ => {
3233
todo!()
3334
}
@@ -126,6 +127,25 @@ impl<S: Storage> Binder<S> {
126127
})
127128
}
128129

130+
async fn bind_unary_op_internal(
131+
&mut self,
132+
expr: &Expr,
133+
op: &UnaryOperator,
134+
) -> Result<ScalarExpression, BindError> {
135+
let expr = Box::new(self.bind_expr(expr).await?);
136+
let ty = if let UnaryOperator::Not = op {
137+
LogicalType::Boolean
138+
} else {
139+
expr.return_type()
140+
};
141+
142+
Ok(ScalarExpression::Unary {
143+
op: (op.clone()).into(),
144+
expr,
145+
ty,
146+
})
147+
}
148+
129149
async fn bind_agg_call(&mut self, func: &Function) -> Result<ScalarExpression, BindError> {
130150
let mut args = Vec::with_capacity(func.args.len());
131151

src/binder/insert.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use sqlparser::ast::{Expr, Ident, ObjectName};
44
use crate::binder::{Binder, BindError, lower_case_name, split_name};
55
use crate::catalog::ColumnRef;
66
use crate::expression::ScalarExpression;
7+
use crate::expression::value_compute::unary_op;
78
use crate::planner::LogicalPlan;
89
use crate::planner::operator::insert::InsertOperator;
910
use crate::planner::operator::Operator;
@@ -16,7 +17,8 @@ impl<S: Storage> Binder<S> {
1617
&mut self,
1718
name: ObjectName,
1819
idents: &[Ident],
19-
expr_rows: &Vec<Vec<Expr>>
20+
expr_rows: &Vec<Vec<Expr>>,
21+
is_overwrite: bool
2022
) -> Result<LogicalPlan, BindError> {
2123
let name = lower_case_name(&name);
2224
let (_, name) = split_name(&name)?;
@@ -45,13 +47,20 @@ impl<S: Storage> Binder<S> {
4547
let mut row = Vec::with_capacity(expr_row.len());
4648

4749
for (i, expr) in expr_row.into_iter().enumerate() {
48-
match self.bind_expr(expr).await? {
50+
match &self.bind_expr(expr).await? {
4951
ScalarExpression::Constant(value) => {
50-
let cast_value = DataValue::clone(&value)
52+
let cast_value = DataValue::clone(value)
5153
.cast(columns[i].datatype())?;
5254

5355
row.push(Arc::new(cast_value))
5456
},
57+
ScalarExpression::Unary { expr, op, .. } => {
58+
if let ScalarExpression::Constant(value) = expr.as_ref() {
59+
row.push(Arc::new(unary_op(value, op)?))
60+
} else {
61+
unreachable!()
62+
}
63+
}
5564
_ => unreachable!(),
5665
}
5766
}
@@ -64,6 +73,7 @@ impl<S: Storage> Binder<S> {
6473
operator: Operator::Insert(
6574
InsertOperator {
6675
table_name,
76+
is_overwrite,
6777
}
6878
),
6979
childrens: vec![values_plan],

src/binder/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,14 @@ impl<S: Storage> Binder<S> {
9090
_ => todo!()
9191
}
9292
}
93-
Statement::Insert { table_name, columns, source, .. } => {
93+
Statement::Insert { table_name, columns, source, overwrite, .. } => {
9494
if let SetExpr::Values(values) = source.body.as_ref() {
95-
self.bind_insert(table_name.to_owned(), columns, &values.rows).await?
95+
self.bind_insert(
96+
table_name.to_owned(),
97+
columns,
98+
&values.rows,
99+
*overwrite
100+
).await?
96101
} else {
97102
todo!()
98103
}

src/db.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ mod test {
184184
let kipsql = Database::with_kipdb(temp_dir.path()).await?;
185185
let _ = kipsql.run("create table t1 (a int primary key, b int, k int)").await?;
186186
let _ = kipsql.run("create table t2 (c int primary key, d int unsigned null, e datetime)").await?;
187-
let _ = kipsql.run("insert into t1 (a, b, k) values (1, 1, 1), (4, 2, 2), (5, 2, 2)").await?;
187+
let _ = kipsql.run("insert into t1 (a, b, k) values (-99, 1, 1), (-1, 2, 2), (5, 2, 2)").await?;
188188
let _ = kipsql.run("insert into t2 (d, c, e) values (2, 1, '2021-05-20 21:00:00'), (3, 4, '2023-09-10 00:00:00')").await?;
189189

190190
println!("full t1:");
@@ -278,11 +278,17 @@ mod test {
278278
println!("{}", create_table(&tuples_distinct_t1));
279279

280280
println!("update t1 with filter:");
281-
let _ = kipsql.run("update t1 set a = 0 where b > 1").await?;
281+
let _ = kipsql.run("update t1 set b = 0 where b > 1").await?;
282282
println!("after t1:");
283283
let update_after_full_t1 = kipsql.run("select * from t1").await?;
284284
println!("{}", create_table(&update_after_full_t1));
285285

286+
println!("insert overwrite t1:");
287+
let _ = kipsql.run("insert overwrite t1 (a, b, k) values (-1, 1, 1)").await?;
288+
println!("after t1:");
289+
let insert_overwrite_after_full_t1 = kipsql.run("select * from t1").await?;
290+
println!("{}", create_table(&insert_overwrite_after_full_t1));
291+
286292
println!("delete t1 with filter:");
287293
let _ = kipsql.run("delete from t1 where b > 1").await?;
288294
println!("after t1:");

src/execution/executor/dml/insert.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ use crate::types::value::{DataValue, ValueRef};
1414
pub struct Insert {
1515
table_name: TableName,
1616
input: BoxedExecutor,
17+
is_overwrite: bool
1718
}
1819

1920
impl From<(InsertOperator, BoxedExecutor)> for Insert {
20-
fn from((InsertOperator { table_name }, input): (InsertOperator, BoxedExecutor)) -> Self {
21+
fn from((InsertOperator { table_name, is_overwrite }, input): (InsertOperator, BoxedExecutor)) -> Self {
2122
Insert {
2223
table_name,
23-
input
24+
input,
25+
is_overwrite,
2426
}
2527
}
2628
}
@@ -34,7 +36,7 @@ impl<S: Storage> Executor<S> for Insert {
3436
impl Insert {
3537
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
3638
pub async fn _execute<S: Storage>(self, storage: S) {
37-
let Insert { table_name, input } = self;
39+
let Insert { table_name, input, is_overwrite } = self;
3840
let mut primary_key_index = None;
3941

4042
if let (Some(table_catalog), Some(mut table)) =
@@ -75,7 +77,7 @@ impl Insert {
7577
tuple.values.push(value)
7678
}
7779

78-
table.append(tuple)?;
80+
table.append(tuple, is_overwrite)?;
7981
}
8082
table.commit().await?;
8183
}

src/execution/executor/dml/update.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,21 @@ impl Update {
4848
#[for_await]
4949
for tuple in input {
5050
let mut tuple = tuple?;
51+
let mut is_overwrite = true;
5152

5253
for (i, column) in tuple.columns.iter().enumerate() {
5354
if let Some(value) = value_map.get(&column.id) {
5455
if column.desc.is_primary {
5556
if let Some(old_key) = tuple.id.replace(value.clone()) {
56-
if value != &old_key {
57-
table.delete(old_key)?;
58-
}
57+
table.delete(old_key)?;
58+
is_overwrite = false;
5959
}
6060
}
6161
tuple.values[i] = value.clone();
6262
}
6363
}
6464

65-
table.append(tuple)?;
65+
table.append(tuple, is_overwrite)?;
6666
}
6767
table.commit().await?;
6868
}

src/expression/evaluator.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::sync::Arc;
22
use itertools::Itertools;
3-
use crate::expression::value_compute::binary_op;
3+
use crate::expression::value_compute::{binary_op, unary_op};
44
use crate::expression::ScalarExpression;
55
use crate::types::errors::TypeError;
66
use crate::types::tuple::Tuple;
@@ -35,7 +35,11 @@ impl ScalarExpression {
3535
ScalarExpression::IsNull{ expr } => {
3636
Ok(Arc::new(DataValue::Boolean(Some(expr.nullable()))))
3737
}
38-
ScalarExpression::Unary{ .. } => todo!(),
38+
ScalarExpression::Unary{ expr, op, .. } => {
39+
let value = expr.eval_column(tuple)?;
40+
41+
Ok(Arc::new(unary_op(&value, op)?))
42+
},
3943
ScalarExpression::AggCall{ .. } => todo!()
4044
}
4145
}

src/expression/mod.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,16 @@ pub enum UnaryOperator {
239239
Plus,
240240
Minus,
241241
Not,
242-
True,
243-
False,
244242
}
245243

246244
impl From<SqlUnaryOperator> for UnaryOperator {
247245
fn from(value: SqlUnaryOperator) -> Self {
248-
todo!()
246+
match value {
247+
SqlUnaryOperator::Plus => UnaryOperator::Plus,
248+
SqlUnaryOperator::Minus => UnaryOperator::Minus,
249+
SqlUnaryOperator::Not => UnaryOperator::Not,
250+
_ => unimplemented!("not support!")
251+
}
249252
}
250253
}
251254

@@ -311,7 +314,7 @@ impl From<SqlBinaryOperator> for BinaryOperator {
311314
SqlBinaryOperator::And => BinaryOperator::And,
312315
SqlBinaryOperator::Or => BinaryOperator::Or,
313316
SqlBinaryOperator::Xor => BinaryOperator::Xor,
314-
_ => todo!()
317+
_ => unimplemented!("not support!")
315318
}
316319
}
317320
}

src/expression/value_compute.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::expression::BinaryOperator;
1+
use crate::expression::{BinaryOperator, UnaryOperator};
22
use crate::types::errors::TypeError;
33
use crate::types::LogicalType;
44
use crate::types::value::DataValue;
@@ -59,6 +59,52 @@ fn unpack_date(value: DataValue) -> Option<i64> {
5959
}
6060
}
6161

62+
pub fn unary_op(
63+
value: &DataValue,
64+
op: &UnaryOperator,
65+
) -> Result<DataValue, TypeError> {
66+
let mut value_type = value.logical_type();
67+
let mut value = value.clone();
68+
69+
if value_type.is_numeric() && matches!(op, UnaryOperator::Plus | UnaryOperator::Minus) {
70+
if value_type.is_unsigned_numeric() {
71+
match value_type {
72+
LogicalType::UTinyint => value_type = LogicalType::Tinyint,
73+
LogicalType::USmallint => value_type = LogicalType::Smallint,
74+
LogicalType::UInteger => value_type = LogicalType::Integer,
75+
LogicalType::UBigint => value_type = LogicalType::Bigint,
76+
_ => unreachable!()
77+
};
78+
value = value.cast(&value_type)?;
79+
}
80+
81+
let result = match op {
82+
UnaryOperator::Plus => value,
83+
UnaryOperator::Minus => {
84+
match value {
85+
DataValue::Float32(option) => DataValue::Float32(option.map(|v| -v)),
86+
DataValue::Float64(option) => DataValue::Float64(option.map(|v| -v)),
87+
DataValue::Int8(option) => DataValue::Int8(option.map(|v| -v)),
88+
DataValue::Int16(option) => DataValue::Int16(option.map(|v| -v)),
89+
DataValue::Int32(option) => DataValue::Int32(option.map(|v| -v)),
90+
DataValue::Int64(option) => DataValue::Int64(option.map(|v| -v)),
91+
_ => unreachable!()
92+
}
93+
}
94+
_ => unreachable!()
95+
};
96+
97+
Ok(result)
98+
} else if matches!((value_type, op), (LogicalType::Boolean, UnaryOperator::Not)) {
99+
match value {
100+
DataValue::Boolean(option) => Ok(DataValue::Boolean(option.map(|v| !v))),
101+
_ => unreachable!()
102+
}
103+
} else {
104+
Err(TypeError::InvalidType)
105+
}
106+
}
107+
62108
/// Tips:
63109
/// - Null values operate as null values
64110
pub fn binary_op(

0 commit comments

Comments
 (0)