@@ -16,15 +16,13 @@ use std::sync::Arc;
1616use std:: sync:: PoisonError ;
1717
1818use databend_common_base:: base:: ProgressValues ;
19- use databend_common_column:: bitmap:: MutableBitmap ;
2019use databend_common_exception:: Result ;
2120use databend_common_expression:: BlockEntry ;
2221use databend_common_expression:: Column ;
2322use databend_common_expression:: DataBlock ;
2423use databend_common_hashtable:: RowPtr ;
2524
2625use crate :: pipelines:: processors:: transforms:: new_hash_join:: join:: EmptyJoinStream ;
27- use crate :: pipelines:: processors:: transforms:: new_hash_join:: join:: OneBlockJoinStream ;
2826use crate :: pipelines:: processors:: transforms:: BasicHashJoinState ;
2927use crate :: pipelines:: processors:: transforms:: HashJoinHashTable ;
3028use crate :: pipelines:: processors:: transforms:: Join ;
@@ -40,11 +38,11 @@ pub struct NestedLoopJoin<T> {
4038}
4139
4240impl < T > NestedLoopJoin < T > {
43- pub fn create ( inner : T , state : Arc < BasicHashJoinState > , desc : NestedLoopDesc ) -> Self {
41+ pub fn new ( inner : T , state : Arc < BasicHashJoinState > , desc : NestedLoopDesc ) -> Self {
4442 Self { inner, state, desc }
4543 }
4644
47- fn finalize_chunks ( & mut self ) {
45+ fn finalize_chunks ( & self ) {
4846 if !self . state . columns . is_empty ( ) {
4947 return ;
5048 }
@@ -79,87 +77,6 @@ impl<T> NestedLoopJoin<T> {
7977 } )
8078 . collect ( ) ;
8179 }
82-
83- fn handle_block ( & mut self , data : DataBlock ) -> Result < Option < DataBlock > > {
84- let HashJoinHashTable :: NestedLoop ( build_blocks) = & * self . state . hash_table else {
85- unreachable ! ( )
86- } ;
87-
88- let probe_rows = data. num_rows ( ) ;
89- let mut matched = Vec :: with_capacity ( probe_rows) ;
90- for ( chunk_index, build) in build_blocks. iter ( ) . enumerate ( ) {
91- for row_index in 0 ..build. num_rows ( ) {
92- let entries = data
93- . columns ( )
94- . iter ( )
95- . cloned ( )
96- . chain ( build. columns ( ) . iter ( ) . map ( |entry| {
97- BlockEntry :: Const (
98- entry. index ( row_index) . unwrap ( ) . to_owned ( ) ,
99- entry. data_type ( ) ,
100- probe_rows,
101- )
102- } ) )
103- . collect ( ) ;
104- let result_count = self
105- . desc
106- . filter
107- . select ( & DataBlock :: new ( entries, probe_rows) ) ?;
108-
109- matched. extend (
110- self . desc . filter . true_selection ( ) [ ..result_count]
111- . iter ( )
112- . copied ( )
113- . map ( |probe| {
114- ( probe, RowPtr {
115- chunk_index : chunk_index as _ ,
116- row_index : row_index as _ ,
117- } )
118- } ) ,
119- ) ;
120- }
121- }
122-
123- if matched. is_empty ( ) {
124- return Ok ( None ) ;
125- }
126-
127- let mut bitmap = MutableBitmap :: with_capacity ( matched. len ( ) ) ;
128- for ( i, _) in & matched {
129- bitmap. set ( * i as _ , true ) ;
130- }
131- let probe = data. filter_with_bitmap ( & bitmap. freeze ( ) ) ?;
132-
133- matched. sort_by_key ( |( v, _) | * v) ;
134- let indices = matched. into_iter ( ) . map ( |( _, row) | row) . collect :: < Vec < _ > > ( ) ;
135-
136- let build_entries = self
137- . state
138- . columns
139- . iter ( )
140- . zip ( & * self . state . column_types )
141- . map ( |( columns, data_type) | {
142- Column :: take_column_vec_indices ( columns, data_type. clone ( ) , & indices, indices. len ( ) )
143- . into ( )
144- } ) ;
145-
146- let data_block = DataBlock :: from_iter (
147- probe. take_columns ( ) . into_iter ( ) . chain ( build_entries) ,
148- indices. len ( ) ,
149- ) ;
150-
151- let Some ( field_reorder) = & self . desc . field_reorder else {
152- return Ok ( Some ( data_block) ) ;
153- } ;
154- let data_block = DataBlock :: from_iter (
155- field_reorder
156- . iter ( )
157- . map ( |offset| data_block. get_by_offset ( * offset) . clone ( ) ) ,
158- data_block. num_rows ( ) ,
159- ) ;
160-
161- Ok ( Some ( data_block) )
162- }
16380}
16481
16582impl < T : Join > Join for NestedLoopJoin < T > {
@@ -180,12 +97,143 @@ impl<T: Join> Join for NestedLoopJoin<T> {
18097 return Ok ( Box :: new ( EmptyJoinStream ) ) ;
18198 }
18299
183- if ! matches ! ( * self . state. hash_table, HashJoinHashTable :: NestedLoop ( _ ) ) {
100+ let HashJoinHashTable :: NestedLoop ( build_blocks ) = & * self . state . hash_table else {
184101 return self . inner . probe_block ( data) ;
102+ } ;
103+ self . finalize_chunks ( ) ;
104+
105+ let max_block_size = self . desc . filter . max_block_size ( ) ;
106+ Ok ( Box :: new ( NestedLoopJoinStream {
107+ probe_block : data,
108+ build_blocks,
109+ state : & self . state ,
110+ max_block_size,
111+ desc : & mut self . desc ,
112+ matches : Vec :: with_capacity ( max_block_size) ,
113+ build_block_index : 0 ,
114+ build_row_index : 0 ,
115+ } ) )
116+ }
117+ }
118+
119+ struct NestedLoopJoinStream < ' a > {
120+ probe_block : DataBlock ,
121+ build_blocks : & ' a [ DataBlock ] ,
122+ state : & ' a BasicHashJoinState ,
123+ desc : & ' a mut NestedLoopDesc ,
124+ max_block_size : usize ,
125+ build_block_index : usize ,
126+ build_row_index : usize ,
127+ matches : Vec < ( u32 , RowPtr ) > ,
128+ }
129+
130+ impl < ' a > NestedLoopJoinStream < ' a > {
131+ fn process_next_row ( & mut self ) -> Result < ( ) > {
132+ let build_block = & self . build_blocks [ self . build_block_index ] ;
133+
134+ let probe_rows = self . probe_block . num_rows ( ) ;
135+ let entries = self
136+ . probe_block
137+ . columns ( )
138+ . iter ( )
139+ . cloned ( )
140+ . chain ( build_block. columns ( ) . iter ( ) . map ( |entry| {
141+ BlockEntry :: Const (
142+ entry. index ( self . build_row_index ) . unwrap ( ) . to_owned ( ) ,
143+ entry. data_type ( ) ,
144+ probe_rows,
145+ )
146+ } ) )
147+ . collect ( ) ;
148+
149+ let result_count = self
150+ . desc
151+ . filter
152+ . select ( & DataBlock :: new ( entries, probe_rows) ) ?;
153+ let row_ptr = RowPtr {
154+ chunk_index : self . build_block_index as u32 ,
155+ row_index : self . build_row_index as u32 ,
156+ } ;
157+ self . matches . extend (
158+ self . desc . filter . true_selection ( ) [ ..result_count]
159+ . iter ( )
160+ . map ( |probe| ( * probe, row_ptr) ) ,
161+ ) ;
162+
163+ self . build_row_index += 1 ;
164+ if self . build_row_index >= build_block. num_rows ( ) {
165+ self . build_row_index = 0 ;
166+ self . build_block_index += 1 ;
185167 }
186168
187- self . finalize_chunks ( ) ;
169+ Ok ( ( ) )
170+ }
171+
172+ fn emit_block ( & mut self , count : usize ) -> Result < DataBlock > {
173+ self . matches . sort_by_key ( |( probe, _) | * probe) ;
174+
175+ let block = {
176+ let ( probe_indices, build_indices) : ( Vec < _ > , Vec < _ > ) =
177+ self . matches . drain ( ..count) . unzip ( ) ;
178+
179+ let probe = self
180+ . probe_block
181+ . clone ( )
182+ . project ( & self . desc . projections )
183+ . take ( & probe_indices) ?;
184+
185+ let build_entries = self
186+ . state
187+ . columns
188+ . iter ( )
189+ . zip ( self . state . column_types . as_slice ( ) )
190+ . enumerate ( )
191+ . filter_map ( |( i, x) | {
192+ let i = self . probe_block . num_columns ( ) + i;
193+ self . desc . projections . contains ( & i) . then_some ( x)
194+ } )
195+ . map ( |( columns, data_type) | {
196+ Column :: take_column_vec_indices (
197+ columns,
198+ data_type. clone ( ) ,
199+ & build_indices,
200+ count,
201+ )
202+ . into ( )
203+ } ) ;
204+
205+ DataBlock :: from_iter ( probe. take_columns ( ) . into_iter ( ) . chain ( build_entries) , count)
206+ } ;
188207
189- Ok ( Box :: new ( OneBlockJoinStream ( self . handle_block ( data) ?) ) )
208+ if let Some ( field_reorder) = & self . desc . field_reorder {
209+ Ok ( DataBlock :: from_iter (
210+ field_reorder
211+ . iter ( )
212+ . map ( |offset| block. get_by_offset ( * offset) . clone ( ) ) ,
213+ block. num_rows ( ) ,
214+ ) )
215+ } else {
216+ Ok ( block)
217+ }
218+ }
219+ }
220+
221+ impl < ' a > JoinStream for NestedLoopJoinStream < ' a > {
222+ fn next ( & mut self ) -> Result < Option < DataBlock > > {
223+ loop {
224+ if self . matches . len ( ) >= self . max_block_size {
225+ return Ok ( Some ( self . emit_block ( self . max_block_size ) ?) ) ;
226+ }
227+
228+ if self . build_block_index >= self . build_blocks . len ( ) {
229+ return if self . matches . is_empty ( ) {
230+ Ok ( None )
231+ } else {
232+ Ok ( Some ( self . emit_block ( self . matches . len ( ) ) ?) )
233+ } ;
234+ }
235+
236+ self . process_next_row ( ) ?;
237+ }
190238 }
191239}
0 commit comments