@@ -208,4 +208,188 @@ impl HashJoin {
208208
209209 hash_random_state. hash_one ( values)
210210 }
211+ }
212+
213+ #[ cfg( test) ]
214+ mod test {
215+ use std:: sync:: Arc ;
216+ use itertools:: Itertools ;
217+ use crate :: catalog:: { ColumnCatalog , ColumnDesc } ;
218+ use crate :: execution:: executor:: { BoxedExecutor , Executor , try_collect} ;
219+ use crate :: execution:: executor:: dql:: join:: hash_join:: HashJoin ;
220+ use crate :: execution:: executor:: dql:: values:: Values ;
221+ use crate :: execution:: ExecutorError ;
222+ use crate :: expression:: ScalarExpression ;
223+ use crate :: planner:: operator:: join:: { JoinCondition , JoinOperator , JoinType } ;
224+ use crate :: planner:: operator:: values:: ValuesOperator ;
225+ use crate :: storage:: memory:: MemStorage ;
226+ use crate :: storage:: Storage ;
227+ use crate :: types:: LogicalType ;
228+ use crate :: types:: tuple:: create_table;
229+ use crate :: types:: value:: { DataValue , ValueRef } ;
230+
231+ fn build_join_values < S : Storage > ( _s : & S ) -> ( Vec < ( ScalarExpression , ScalarExpression ) > , BoxedExecutor , BoxedExecutor ) {
232+ let desc = ColumnDesc :: new ( LogicalType :: Integer , false ) ;
233+
234+ let t1_columns = vec ! [
235+ Arc :: new( ColumnCatalog :: new( "c1" . to_string( ) , true , desc. clone( ) ) ) ,
236+ Arc :: new( ColumnCatalog :: new( "c2" . to_string( ) , true , desc. clone( ) ) ) ,
237+ Arc :: new( ColumnCatalog :: new( "c3" . to_string( ) , true , desc. clone( ) ) ) ,
238+ ] ;
239+
240+ let t2_columns = vec ! [
241+ Arc :: new( ColumnCatalog :: new( "c4" . to_string( ) , true , desc. clone( ) ) ) ,
242+ Arc :: new( ColumnCatalog :: new( "c5" . to_string( ) , true , desc. clone( ) ) ) ,
243+ Arc :: new( ColumnCatalog :: new( "c6" . to_string( ) , true , desc. clone( ) ) ) ,
244+ ] ;
245+
246+ let on_keys = vec ! [
247+ ( ScalarExpression :: ColumnRef ( t1_columns[ 0 ] . clone( ) ) , ScalarExpression :: ColumnRef ( t2_columns[ 0 ] . clone( ) ) )
248+ ] ;
249+
250+ let values_t1 = Values :: from ( ValuesOperator {
251+ rows : vec ! [
252+ vec![
253+ Arc :: new( DataValue :: Int32 ( Some ( 0 ) ) ) ,
254+ Arc :: new( DataValue :: Int32 ( Some ( 2 ) ) ) ,
255+ Arc :: new( DataValue :: Int32 ( Some ( 4 ) ) ) ,
256+ ] ,
257+ vec![
258+ Arc :: new( DataValue :: Int32 ( Some ( 1 ) ) ) ,
259+ Arc :: new( DataValue :: Int32 ( Some ( 3 ) ) ) ,
260+ Arc :: new( DataValue :: Int32 ( Some ( 5 ) ) ) ,
261+ ] ,
262+ vec![
263+ Arc :: new( DataValue :: Int32 ( Some ( 3 ) ) ) ,
264+ Arc :: new( DataValue :: Int32 ( Some ( 5 ) ) ) ,
265+ Arc :: new( DataValue :: Int32 ( Some ( 7 ) ) ) ,
266+ ]
267+ ] ,
268+ columns : t1_columns,
269+ } ) ;
270+
271+ let values_t2 = Values :: from ( ValuesOperator {
272+ rows : vec ! [
273+ vec![
274+ Arc :: new( DataValue :: Int32 ( Some ( 0 ) ) ) ,
275+ Arc :: new( DataValue :: Int32 ( Some ( 2 ) ) ) ,
276+ Arc :: new( DataValue :: Int32 ( Some ( 4 ) ) ) ,
277+ ] ,
278+ vec![
279+ Arc :: new( DataValue :: Int32 ( Some ( 1 ) ) ) ,
280+ Arc :: new( DataValue :: Int32 ( Some ( 3 ) ) ) ,
281+ Arc :: new( DataValue :: Int32 ( Some ( 5 ) ) ) ,
282+ ] ,
283+ vec![
284+ Arc :: new( DataValue :: Int32 ( Some ( 4 ) ) ) ,
285+ Arc :: new( DataValue :: Int32 ( Some ( 6 ) ) ) ,
286+ Arc :: new( DataValue :: Int32 ( Some ( 8 ) ) ) ,
287+ ] ,
288+ vec![
289+ Arc :: new( DataValue :: Int32 ( Some ( 1 ) ) ) ,
290+ Arc :: new( DataValue :: Int32 ( Some ( 1 ) ) ) ,
291+ Arc :: new( DataValue :: Int32 ( Some ( 1 ) ) ) ,
292+ ] ,
293+ ] ,
294+ columns : t2_columns,
295+ } ) ;
296+
297+
298+
299+ ( on_keys, values_t1. execute ( _s) , values_t2. execute ( _s) )
300+ }
301+
302+ fn build_integers ( ints : Vec < Option < i32 > > ) -> Vec < ValueRef > {
303+ ints. into_iter ( )
304+ . map ( |i| Arc :: new ( DataValue :: Int32 ( i) ) )
305+ . collect_vec ( )
306+ }
307+
308+ #[ tokio:: test]
309+ async fn test_inner_join ( ) -> Result < ( ) , ExecutorError > {
310+ let mem_storage = MemStorage :: new ( ) ;
311+ let ( keys, left, right) = build_join_values ( & mem_storage) ;
312+
313+ let op = JoinOperator {
314+ on : JoinCondition :: On { on : keys, filter : None } ,
315+ join_type : JoinType :: Inner ,
316+ } ;
317+ let mut executor = HashJoin :: from ( ( op, left, right) ) . execute ( & mem_storage) ;
318+ let tuples = try_collect ( & mut executor) . await ?;
319+
320+ println ! ( "inner_test: \n {}" , create_table( & tuples) ) ;
321+
322+ assert_eq ! ( tuples[ 0 ] . values, build_integers( vec![ Some ( 0 ) , Some ( 2 ) , Some ( 4 ) , Some ( 0 ) , Some ( 2 ) , Some ( 4 ) ] ) ) ;
323+ assert_eq ! ( tuples[ 1 ] . values, build_integers( vec![ Some ( 1 ) , Some ( 3 ) , Some ( 5 ) , Some ( 1 ) , Some ( 3 ) , Some ( 5 ) ] ) ) ;
324+ assert_eq ! ( tuples[ 2 ] . values, build_integers( vec![ Some ( 1 ) , Some ( 3 ) , Some ( 5 ) , Some ( 1 ) , Some ( 1 ) , Some ( 1 ) ] ) ) ;
325+
326+ Ok ( ( ) )
327+ }
328+
329+ #[ tokio:: test]
330+ async fn test_left_join ( ) -> Result < ( ) , ExecutorError > {
331+ let mem_storage = MemStorage :: new ( ) ;
332+ let ( keys, left, right) = build_join_values ( & mem_storage) ;
333+
334+ let op = JoinOperator {
335+ on : JoinCondition :: On { on : keys, filter : None } ,
336+ join_type : JoinType :: Left ,
337+ } ;
338+ let mut executor = HashJoin :: from ( ( op, left, right) ) . execute ( & mem_storage) ;
339+ let tuples = try_collect ( & mut executor) . await ?;
340+
341+ println ! ( "left_test: \n {}" , create_table( & tuples) ) ;
342+
343+ assert_eq ! ( tuples[ 0 ] . values, build_integers( vec![ Some ( 0 ) , Some ( 2 ) , Some ( 4 ) , Some ( 0 ) , Some ( 2 ) , Some ( 4 ) ] ) ) ;
344+ assert_eq ! ( tuples[ 1 ] . values, build_integers( vec![ Some ( 1 ) , Some ( 3 ) , Some ( 5 ) , Some ( 1 ) , Some ( 3 ) , Some ( 5 ) ] ) ) ;
345+ assert_eq ! ( tuples[ 2 ] . values, build_integers( vec![ Some ( 1 ) , Some ( 3 ) , Some ( 5 ) , Some ( 1 ) , Some ( 1 ) , Some ( 1 ) ] ) ) ;
346+ assert_eq ! ( tuples[ 3 ] . values, build_integers( vec![ Some ( 3 ) , Some ( 5 ) , Some ( 7 ) , None , None , None ] ) ) ;
347+
348+ Ok ( ( ) )
349+ }
350+
351+ #[ tokio:: test]
352+ async fn test_right_join ( ) -> Result < ( ) , ExecutorError > {
353+ let mem_storage = MemStorage :: new ( ) ;
354+ let ( keys, left, right) = build_join_values ( & mem_storage) ;
355+
356+ let op = JoinOperator {
357+ on : JoinCondition :: On { on : keys, filter : None } ,
358+ join_type : JoinType :: Right ,
359+ } ;
360+ let mut executor = HashJoin :: from ( ( op, left, right) ) . execute ( & mem_storage) ;
361+ let tuples = try_collect ( & mut executor) . await ?;
362+
363+ println ! ( "right_test: \n {}" , create_table( & tuples) ) ;
364+
365+ assert_eq ! ( tuples[ 0 ] . values, build_integers( vec![ Some ( 0 ) , Some ( 2 ) , Some ( 4 ) , Some ( 0 ) , Some ( 2 ) , Some ( 4 ) ] ) ) ;
366+ assert_eq ! ( tuples[ 1 ] . values, build_integers( vec![ Some ( 1 ) , Some ( 3 ) , Some ( 5 ) , Some ( 1 ) , Some ( 3 ) , Some ( 5 ) ] ) ) ;
367+ assert_eq ! ( tuples[ 2 ] . values, build_integers( vec![ None , None , None , Some ( 4 ) , Some ( 6 ) , Some ( 8 ) ] ) ) ;
368+ assert_eq ! ( tuples[ 3 ] . values, build_integers( vec![ Some ( 1 ) , Some ( 3 ) , Some ( 5 ) , Some ( 1 ) , Some ( 1 ) , Some ( 1 ) ] ) ) ;
369+
370+ Ok ( ( ) )
371+ }
372+
373+ #[ tokio:: test]
374+ async fn test_full_join ( ) -> Result < ( ) , ExecutorError > {
375+ let mem_storage = MemStorage :: new ( ) ;
376+ let ( keys, left, right) = build_join_values ( & mem_storage) ;
377+
378+ let op = JoinOperator {
379+ on : JoinCondition :: On { on : keys, filter : None } ,
380+ join_type : JoinType :: Full ,
381+ } ;
382+ let mut executor = HashJoin :: from ( ( op, left, right) ) . execute ( & mem_storage) ;
383+ let tuples = try_collect ( & mut executor) . await ?;
384+
385+ println ! ( "full_test: \n {}" , create_table( & tuples) ) ;
386+
387+ assert_eq ! ( tuples[ 0 ] . values, build_integers( vec![ Some ( 0 ) , Some ( 2 ) , Some ( 4 ) , Some ( 0 ) , Some ( 2 ) , Some ( 4 ) ] ) ) ;
388+ assert_eq ! ( tuples[ 1 ] . values, build_integers( vec![ Some ( 1 ) , Some ( 3 ) , Some ( 5 ) , Some ( 1 ) , Some ( 3 ) , Some ( 5 ) ] ) ) ;
389+ assert_eq ! ( tuples[ 2 ] . values, build_integers( vec![ None , None , None , Some ( 4 ) , Some ( 6 ) , Some ( 8 ) ] ) ) ;
390+ assert_eq ! ( tuples[ 3 ] . values, build_integers( vec![ Some ( 1 ) , Some ( 3 ) , Some ( 5 ) , Some ( 1 ) , Some ( 1 ) , Some ( 1 ) ] ) ) ;
391+ assert_eq ! ( tuples[ 4 ] . values, build_integers( vec![ Some ( 3 ) , Some ( 5 ) , Some ( 7 ) , None , None , None ] ) ) ;
392+
393+ Ok ( ( ) )
394+ }
211395}
0 commit comments