1313// limitations under the License.
1414
1515use std:: any:: Any ;
16- use std:: sync:: Arc ;
1716
1817use databend_common_exception:: Result ;
1918use databend_common_expression:: DataSchemaRef ;
2019use databend_common_expression:: DataSchemaRefExt ;
2120use databend_common_expression:: RemoteExpr ;
2221use databend_common_functions:: BUILTIN_FUNCTIONS ;
22+ use databend_common_pipeline:: core:: InputPort ;
23+ use databend_common_pipeline:: core:: OutputPort ;
24+ use databend_common_pipeline:: core:: Pipe ;
25+ use databend_common_pipeline:: core:: PipeItem ;
2326use databend_common_pipeline:: core:: ProcessorPtr ;
24- use databend_common_pipeline:: sinks:: Sinker ;
2527use databend_common_sql:: executor:: cast_expr_to_non_null_boolean;
2628use databend_common_sql:: optimizer:: ir:: SExpr ;
2729use databend_common_sql:: plans:: JoinType ;
@@ -38,8 +40,7 @@ use super::PhysicalPlan;
3840use super :: PhysicalPlanBuilder ;
3941use super :: PhysicalPlanMeta ;
4042use crate :: pipelines:: processors:: transforms:: LoopJoinState ;
41- use crate :: pipelines:: processors:: transforms:: TransformLoopJoinLeft ;
42- use crate :: pipelines:: processors:: transforms:: TransformLoopJoinRight ;
43+ use crate :: pipelines:: processors:: transforms:: TransformLoopJoin ;
4344use crate :: pipelines:: PipelineBuilder ;
4445
4546#[ derive( Clone , Debug , serde:: Serialize , serde:: Deserialize ) ]
@@ -110,25 +111,56 @@ impl IPhysicalPlan for NestedLoopJoin {
110111 }
111112
112113 fn build_pipeline2 ( & self , builder : & mut PipelineBuilder ) -> Result < ( ) > {
113- let state = Arc :: new ( LoopJoinState :: new ( builder. ctx . clone ( ) , self ) ) ;
114- self . build_right ( state. clone ( ) , builder) ?;
115- self . build_left ( state, builder)
116- }
117- }
114+ // Build right side (build side)
115+ let right_side_builder = builder. create_sub_pipeline_builder ( ) ;
116+ let mut right_res = right_side_builder. finalize ( & self . right ) ?;
117+ let mut build_sinks = right_res. main_pipeline . take_sinks ( ) ;
118+ builder
119+ . pipelines
120+ . push ( right_res. main_pipeline . finalize ( None ) ) ;
121+ builder. pipelines . extend ( right_res. sources_pipelines ) ;
118122
119- impl NestedLoopJoin {
120- fn build_left ( & self , state : Arc < LoopJoinState > , builder : & mut PipelineBuilder ) -> Result < ( ) > {
123+ // Build left side (probe side)
121124 self . left . build_pipeline ( builder) ?;
122125
123- let max_threads = builder. settings . get_max_threads ( ) ? as usize ;
124- builder. main_pipeline . try_resize ( max_threads) ?;
125- builder. main_pipeline . add_transform ( |input, output| {
126- Ok ( ProcessorPtr :: create ( TransformLoopJoinLeft :: create (
127- input,
128- output,
129- state. clone ( ) ,
130- ) ) )
131- } ) ?;
126+ let output_len = std:: cmp:: max ( build_sinks. len ( ) , builder. main_pipeline . output_len ( ) ) ;
127+ builder. main_pipeline . resize ( output_len, false ) ?;
128+ let probe_sinks = builder. main_pipeline . take_sinks ( ) ;
129+
130+ if output_len != build_sinks. len ( ) {
131+ builder. main_pipeline . extend_sinks ( build_sinks) ;
132+ builder. main_pipeline . resize ( output_len, false ) ?;
133+ build_sinks = builder. main_pipeline . take_sinks ( ) ;
134+ }
135+
136+ debug_assert_eq ! ( build_sinks. len( ) , probe_sinks. len( ) ) ;
137+
138+ let join_pipe_items = build_sinks
139+ . into_iter ( )
140+ . zip ( probe_sinks)
141+ . map ( |( build_sink, probe_sink) | {
142+ builder. main_pipeline . extend_sinks ( [ build_sink, probe_sink] ) ;
143+
144+ let build_input = InputPort :: create ( ) ;
145+ let probe_input = InputPort :: create ( ) ;
146+ let joined_output = OutputPort :: create ( ) ;
147+
148+ let join_state = LoopJoinState :: new ( builder. ctx . clone ( ) , self ) ;
149+ let loop_join = ProcessorPtr :: create ( TransformLoopJoin :: create (
150+ build_input. clone ( ) ,
151+ probe_input. clone ( ) ,
152+ joined_output. clone ( ) ,
153+ Box :: new ( join_state) ,
154+ ) ) ;
155+
156+ PipeItem :: create ( loop_join, vec ! [ build_input, probe_input] , vec ! [
157+ joined_output,
158+ ] )
159+ } )
160+ . collect ( ) ;
161+
162+ let join_pipe = Pipe :: create ( output_len * 2 , output_len, join_pipe_items) ;
163+ builder. main_pipeline . add_pipe ( join_pipe) ;
132164
133165 match self . conditions . len ( ) {
134166 0 => Ok ( ( ) ) ,
@@ -148,24 +180,6 @@ impl NestedLoopJoin {
148180 }
149181 }
150182 }
151-
152- fn build_right ( & self , state : Arc < LoopJoinState > , builder : & mut PipelineBuilder ) -> Result < ( ) > {
153- let right_side_builder = builder. create_sub_pipeline_builder ( ) ;
154-
155- let mut right_res = right_side_builder. finalize ( & self . right ) ?;
156- right_res. main_pipeline . add_sink ( |input| {
157- Ok ( ProcessorPtr :: create ( Sinker :: create (
158- input,
159- TransformLoopJoinRight :: create ( state. clone ( ) ) ?,
160- ) ) )
161- } ) ?;
162-
163- builder
164- . pipelines
165- . push ( right_res. main_pipeline . finalize ( None ) ) ;
166- builder. pipelines . extend ( right_res. sources_pipelines ) ;
167- Ok ( ( ) )
168- }
169183}
170184
171185impl PhysicalPlanBuilder {
0 commit comments