@@ -90,20 +90,33 @@ function reducedvalue(freduce::Function,rank,
9090
9191 N = nchildren (pipe)
9292 leftchild = N > 0
93- vals = Vector {Tred} (undef,N+ 1 )
93+ selfvalpresent = rank > 0
94+ vals = Vector {Tred} (undef,N + selfvalpresent)
9495 @sync begin
9596 @async begin
96- selfval = take! (pipe. selfchannels. out):: Tmap
97- selfvalred = freduce ((value (selfval),))
98- ind = 1 + leftchild
99- v = pval (rank,selfvalred)
100- vals[ind] = v
97+ if selfvalpresent
98+ selfval = take! (pipe. selfchannels. out):: Tmap
99+ selfvalred = freduce ((value (selfval),))
100+ pv = pval (rank,selfvalred)
101+ ind = selfvalpresent + leftchild
102+ vals[ind] = pv
103+ end
101104 end
102- @async for i= 2 : N+ 1
103- pv = take! (pipe. childrenchannels. out) :: Tred
104- shift = pv. rank > rank ? 1 : - 1
105- ind = shift + leftchild + 1
106- vals[ind] = pv
105+ @async begin
106+ if selfvalpresent
107+ for i= 1 : N
108+ pv = take! (pipe. childrenchannels. out) :: Tred
109+ shift = pv. rank > rank ? 1 : - 1
110+ ind = shift + leftchild + 1
111+ vals[ind] = pv
112+ end
113+ else
114+ for i= 1 : N
115+ pv = take! (pipe. childrenchannels. out) :: Tred
116+ vals[i] = pv
117+ end
118+ sort! (vals,by= pv-> pv. rank)
119+ end
107120 end
108121 end
109122
@@ -125,7 +138,7 @@ function reduceTreeNode(freduce::Function,rank,pipe::BranchChannel{Tmap,Tred},
125138 anyerr = take! (pipe. selfchannels. err)
126139 else
127140 anyerr = false
128- end
141+ end
129142 anyerr = anyerr ||
130143 any (take! (pipe. childrenchannels. err) for i= 1 : nchildren (pipe))
131144
@@ -542,7 +555,7 @@ function pmapreduce(fmap::Function,Tmap::Type,freduce::Function,Tred::Type,
542555 iterators:: Tuple ,args... ;kwargs... )
543556
544557 tree,branches = createbranchchannels (pval{Tmap},pval{Tred},
545- iterators,OrderedBinaryTree )
558+ iterators, SegmentedOrderedBinaryTree )
546559 pmapreduceworkers (fmap,freduce,iterators,tree,
547560 branches,Sorted (),args... ;kwargs... )
548561end
0 commit comments