Skip to content

Commit 3d669b2

Browse files
committed
fix: many bugs
- when `Project` is pushed down, `InputRefs` loses its related `ColumnRef` information - when the `Project` is pushed down to Scan, the `Alias` content may not be `ColumnRef`, causing problems. - `Binary` expression implements `output_columns` - `max_logic_type` returns the `DateTime` type for a combination of `DateTime` and `VarChar` - `raw_len` of `DateTime` returns incorrectly
1 parent d59cca2 commit 3d669b2

File tree

11 files changed

+124
-33
lines changed

11 files changed

+124
-33
lines changed

src/db.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,10 @@ mod test {
263263
let tuples_time_max = kipsql.run("select max(e) as max_time from t2").await?;
264264
println!("{}", create_table(&tuples_time_max));
265265

266+
println!("time where:");
267+
let tuples_time_where_t2 = kipsql.run("select (c + 1) from t2 where e > '2021-05-20 21:00:00'").await?;
268+
println!("{}", create_table(&tuples_time_where_t2));
269+
266270
assert!(kipsql.run("select max(d) from t2 group by c").await.is_err());
267271

268272
println!("distinct t1:");

src/execution/executor/dql/aggregate/hash_agg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl HashAggExecutor {
4848
self.agg_calls
4949
.iter()
5050
.chain(self.groupby_exprs.iter())
51-
.map(|expr| expr.output_column(&tuple))
51+
.map(|expr| expr.output_columns(&tuple))
5252
.collect_vec()
5353
});
5454

src/execution/executor/dql/aggregate/simple_agg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl SimpleAggExecutor {
4242
columns_option.get_or_insert_with(|| {
4343
self.agg_calls
4444
.iter()
45-
.map(|expr| expr.output_column(&tuple))
45+
.map(|expr| expr.output_columns(&tuple))
4646
.collect_vec()
4747
});
4848

src/execution/executor/dql/projection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl Projection {
4040

4141
for expr in exprs.iter() {
4242
values.push(expr.eval_column(&tuple));
43-
columns.push(expr.output_column(&tuple));
43+
columns.push(expr.output_columns(&tuple));
4444
}
4545

4646
yield Tuple { id: None, columns, values, };

src/expression/mod.rs

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::fmt;
2+
use std::fmt::Formatter;
13
use std::sync::Arc;
24
use itertools::Itertools;
35

@@ -59,6 +61,14 @@ pub enum ScalarExpression {
5961
}
6062

6163
impl ScalarExpression {
64+
pub fn unpack_alias(&self) -> &ScalarExpression {
65+
if let ScalarExpression::Alias { expr, .. } = self {
66+
expr.unpack_alias()
67+
} else {
68+
self
69+
}
70+
}
71+
6272
pub fn nullable(&self) -> bool {
6373
match self {
6474
ScalarExpression::Constant(_) => false,
@@ -154,14 +164,14 @@ impl ScalarExpression {
154164
}
155165
}
156166

157-
pub fn output_column(&self, tuple: &Tuple) -> ColumnRef {
167+
pub fn output_columns(&self, tuple: &Tuple) -> ColumnRef {
158168
match self {
159169
ScalarExpression::ColumnRef(col) => {
160170
col.clone()
161171
}
162172
ScalarExpression::Constant(value) => {
163173
Arc::new(ColumnCatalog::new(
164-
String::new(),
174+
format!("{}", value),
165175
true,
166176
ColumnDesc::new(value.logical_type(), false)
167177
))
@@ -175,7 +185,7 @@ impl ScalarExpression {
175185
}
176186
ScalarExpression::AggCall { kind, args, ty, distinct } => {
177187
let args_str = args.iter()
178-
.map(|expr| expr.output_column(tuple).name.clone())
188+
.map(|expr| expr.output_columns(tuple).name.clone())
179189
.join(", ");
180190
let op = |allow_distinct, distinct| {
181191
if allow_distinct && distinct {
@@ -200,6 +210,25 @@ impl ScalarExpression {
200210
ScalarExpression::InputRef { index, .. } => {
201211
tuple.columns[*index].clone()
202212
}
213+
ScalarExpression::Binary {
214+
left_expr,
215+
right_expr,
216+
op,
217+
ty
218+
} => {
219+
let column_name = format!(
220+
"({} {} {})",
221+
left_expr.output_columns(tuple).name,
222+
op,
223+
right_expr.output_columns(tuple).name,
224+
);
225+
226+
Arc::new(ColumnCatalog::new(
227+
column_name,
228+
true,
229+
ColumnDesc::new(ty.clone(), false)
230+
))
231+
}
203232
_ => unreachable!()
204233
}
205234
}
@@ -238,9 +267,29 @@ pub enum BinaryOperator {
238267
And,
239268
Or,
240269
Xor,
241-
BitwiseOr,
242-
BitwiseAnd,
243-
BitwiseXor,
270+
}
271+
272+
impl fmt::Display for BinaryOperator {
273+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
274+
match self {
275+
BinaryOperator::Plus => write!(f, "+"),
276+
BinaryOperator::Minus => write!(f, "-"),
277+
BinaryOperator::Multiply => write!(f, "*"),
278+
BinaryOperator::Divide => write!(f, "/"),
279+
BinaryOperator::Modulo => write!(f, "mod"),
280+
BinaryOperator::StringConcat => write!(f, "&"),
281+
BinaryOperator::Gt => write!(f, ">"),
282+
BinaryOperator::Lt => write!(f, "<"),
283+
BinaryOperator::GtEq => write!(f, ">="),
284+
BinaryOperator::LtEq => write!(f, "<="),
285+
BinaryOperator::Spaceship => write!(f, "<=>"),
286+
BinaryOperator::Eq => write!(f, "="),
287+
BinaryOperator::NotEq => write!(f, "!="),
288+
BinaryOperator::And => write!(f, "&&"),
289+
BinaryOperator::Or => write!(f, "||"),
290+
BinaryOperator::Xor => write!(f, "^"),
291+
}
292+
}
244293
}
245294

246295
impl From<SqlBinaryOperator> for BinaryOperator {
@@ -262,9 +311,6 @@ impl From<SqlBinaryOperator> for BinaryOperator {
262311
SqlBinaryOperator::And => BinaryOperator::And,
263312
SqlBinaryOperator::Or => BinaryOperator::Or,
264313
SqlBinaryOperator::Xor => BinaryOperator::Xor,
265-
SqlBinaryOperator::BitwiseOr => BinaryOperator::BitwiseOr,
266-
SqlBinaryOperator::BitwiseAnd => BinaryOperator::BitwiseAnd,
267-
SqlBinaryOperator::BitwiseXor => BinaryOperator::BitwiseXor,
268314
_ => todo!()
269315
}
270316
}

src/optimizer/rule/column_pruning.rs

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,7 @@ impl Rule for PushProjectIntoScan {
5151

5252
new_scan_op.columns = project_op.columns
5353
.iter()
54-
.filter(|expr| matches!(
55-
expr,
56-
ScalarExpression::ColumnRef(_) | ScalarExpression::Alias { .. }
57-
))
54+
.filter(|expr| matches!(expr.unpack_alias(),ScalarExpression::ColumnRef(_)))
5855
.cloned()
5956
.collect_vec();
6057

@@ -82,16 +79,27 @@ impl Rule for PushProjectThroughChild {
8279

8380
if let Operator::Project(_) = node_operator {
8481
let child_index = graph.children_at(node_id)[0];
85-
let node_referenced_columns = node_operator.referenced_columns();
82+
let mut node_referenced_columns = node_operator.referenced_columns();
83+
let child_operator = graph.operator(child_index);
84+
let child_referenced_columns = child_operator.referenced_columns();
85+
let is_child_agg = matches!(child_operator, Operator::Aggregate(_));
86+
87+
// When the aggregate function is a child node,
88+
// the pushdown will lose the corresponding ColumnRef due to `InputRef`.
89+
// Therefore, it is necessary to map the InputRef to the corresponding ColumnRef
90+
// and push it down.
91+
if is_child_agg && !input_refs.is_empty() {
92+
node_referenced_columns.append(
93+
&mut child_operator.agg_mapping_col_refs(&input_refs)
94+
)
95+
}
8696

87-
let intersection_columns_ids = graph
88-
.operator(child_index)
89-
.referenced_columns()
90-
.into_iter()
97+
let intersection_columns_ids = child_referenced_columns
98+
.iter()
9199
.map(|col| col.id)
92100
.chain(
93101
node_referenced_columns
94-
.into_iter()
102+
.iter()
95103
.map(|col| col.id)
96104
)
97105
.collect::<HashSet<ColumnId>>();
@@ -101,25 +109,28 @@ impl Rule for PushProjectThroughChild {
101109
}
102110

103111
for grandson_id in graph.children_at(child_index) {
104-
let columns = graph.operator(grandson_id)
112+
let mut columns = graph.operator(grandson_id)
105113
.referenced_columns()
106114
.into_iter()
107115
.unique_by(|col| col.id)
108116
.filter(|u| intersection_columns_ids.contains(&u.id))
109117
.map(|col| ScalarExpression::ColumnRef(col))
110118
.collect_vec();
111-
// Tips: Aggregation fields take precedence
112-
let full_columns = input_refs.iter()
113-
.cloned()
114-
.chain(columns)
115-
.collect_vec();
116119

117-
if !full_columns.is_empty() {
120+
if !is_child_agg && !input_refs.is_empty() {
121+
// Tips: Aggregation InputRefs fields take precedence
122+
columns = input_refs.iter()
123+
.cloned()
124+
.chain(columns)
125+
.collect_vec();
126+
}
127+
128+
if !columns.is_empty() {
118129
graph.add_node(
119130
child_index,
120131
Some(grandson_id),
121132
OptExprNode::OperatorRef(
122-
Operator::Project(ProjectOperator { columns: full_columns })
133+
Operator::Project(ProjectOperator { columns })
123134
)
124135
);
125136
}

src/planner/operator/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ impl Operator {
5858
Operator::Project(op) => {
5959
op.columns
6060
.iter()
61+
.map(ScalarExpression::unpack_alias)
6162
.filter(|expr| matches!(expr, ScalarExpression::InputRef { .. }))
6263
.cloned()
6364
.collect_vec()
@@ -66,6 +67,25 @@ impl Operator {
6667
}
6768
}
6869

70+
pub fn agg_mapping_col_refs(&self, input_refs: &[ScalarExpression]) -> Vec<ColumnRef> {
71+
match self {
72+
Operator::Aggregate(AggregateOperator { agg_calls, .. }) => {
73+
input_refs.iter()
74+
.filter_map(|expr| {
75+
if let ScalarExpression::InputRef { index, .. } = expr {
76+
Some(agg_calls[*index].clone())
77+
} else {
78+
None
79+
}
80+
})
81+
.map(|expr| expr.referenced_columns())
82+
.flatten()
83+
.collect_vec()
84+
}
85+
_ => vec![],
86+
}
87+
}
88+
6989
pub fn referenced_columns(&self) -> Vec<ColumnRef> {
7090
match self {
7191
Operator::Aggregate(op) => {

src/storage/kip.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ impl Transaction for KipTraction<'_> {
218218

219219
for expr in self.projections.iter() {
220220
values.push(expr.eval_column(&tuple));
221-
columns.push(expr.output_column(&tuple));
221+
columns.push(expr.output_columns(&tuple));
222222
}
223223

224224
self.limit = self.limit.map(|num| num - 1);

src/storage/memory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ impl Transaction for MemTraction<'_> {
220220

221221
for expr in self.projections.iter() {
222222
values.push(expr.eval_column(&tuple));
223-
columns.push(expr.output_column(&tuple));
223+
columns.push(expr.output_columns(&tuple));
224224
}
225225

226226
self.limit = self.limit.map(|num| num - 1);

src/types/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl LogicalType {
7575
LogicalType::Float => Some(4),
7676
LogicalType::Double => Some(8),
7777
LogicalType::Varchar => None,
78-
LogicalType::Date => Some(4),
78+
LogicalType::Date => Some(8),
7979
}
8080
}
8181

@@ -146,6 +146,9 @@ impl LogicalType {
146146
if left.is_numeric() && right.is_numeric() {
147147
return LogicalType::combine_numeric_types(left, right);
148148
}
149+
if matches!((left, right), (LogicalType::Date, LogicalType::Varchar) | (LogicalType::Varchar, LogicalType::Date)) {
150+
return Ok(LogicalType::Date);
151+
}
149152
Err(TypeError::InternalError(format!(
150153
"can not compare two types: {:?} and {:?}",
151154
left, right

0 commit comments

Comments
 (0)