11module ParallelUtilities
2+ using ProgressMeter
23
34using Reexport
45@reexport using Distributed
763764 put! (pipe. selfchannels. out,valT)
764765end
765766
766- function mapTreeNode (fmap:: Function ,iterator,pipe:: BranchChannel ,args... ;kwargs... )
767+ function mapTreeNode (fmap:: Function ,iterator,pipe:: BranchChannel ,
768+ progress:: RemoteChannel ,args... ;kwargs... )
767769 # Evaluate the function
768770 # Store the error flag locally
769771 # If there are no errors then store the result locally
@@ -775,6 +777,8 @@ function mapTreeNode(fmap::Function,iterator,pipe::BranchChannel,args...;kwargs.
775777 catch
776778 put! (pipe. selfchannels. err,true )
777779 rethrow ()
780+ finally
781+ put! (progress,(true ,false ))
778782 end
779783end
780784
@@ -813,7 +817,8 @@ function reducedvalue(freduce::Function,pipe::BranchChannel{Tmap,Tred},::Sorted)
813817 Tred (freduce (value (v) for v in vals))
814818end
815819
816- function reduceTreeNode (freduce:: Function ,pipe:: BranchChannel{Tmap,Tred} ,ifsort:: Ordering ) where {Tmap,Tred}
820+ function reduceTreeNode (freduce:: Function ,pipe:: BranchChannel{Tmap,Tred} ,
821+ ifsort:: Ordering ,progress:: RemoteChannel ) where {Tmap,Tred}
817822 # This function that communicates with the parent and children
818823
819824 # Start by checking if there is any error locally in the map,
@@ -831,9 +836,12 @@ function reduceTreeNode(freduce::Function,pipe::BranchChannel{Tmap,Tred},ifsort:
831836 catch e
832837 put! (pipe. parentchannels. err,true )
833838 rethrow ()
839+ finally
840+ put! (progress,(false ,true ))
834841 end
835842 else
836843 put! (pipe. parentchannels. err,true )
844+ put! (progress,(false ,true ))
837845 end
838846
839847 finalize (pipe)
@@ -853,15 +861,34 @@ function pmapreduceworkers(fmap::Function,freduce::Function,iterators::Tuple,
853861
854862 num_workers_active = nworkersactive (iterators)
855863
856- # Run the function on each processor and compute the reduction at each node
857- @sync for (rank,mypipe) in enumerate (branches)
858- @async begin
859- p = mypipe. p
860- iterable_on_proc = evenlyscatterproduct (iterators,num_workers_active,rank)
864+ nmap,nred = 0 ,0
865+ progresschannel = RemoteChannel (()-> Channel {Tuple{Bool,Bool}} (2 num_workers_active))
866+ progressbar = Progress (2 num_workers_active,1 ," Progress in pmapreduce : " )
861867
862- @spawnat p mapTreeNode (fmap,iterable_on_proc,mypipe,args... ;kwargs... )
863- @spawnat p reduceTreeNode (freduce,mypipe,ord)
868+ # Run the function on each processor and compute the reduction at each node
869+ @sync begin
870+ for (rank,mypipe) in enumerate (branches)
871+ @async begin
872+ p = mypipe. p
873+ iterable_on_proc = evenlyscatterproduct (iterators,num_workers_active,rank)
874+
875+ @spawnat p mapTreeNode (fmap,iterable_on_proc,mypipe,
876+ progresschannel,args... ;kwargs... )
877+ @spawnat p reduceTreeNode (freduce,mypipe,ord,progresschannel)
878+ end
879+ end
880+
881+ for i = 1 : 2 num_workers_active
882+ mapdone,reddone = take! (progresschannel)
883+ if mapdone
884+ nmap += 1
885+ end
886+ if reddone
887+ nred += 1
888+ end
889+ next! (progressbar;showvalues= [(:map ,nmap),(:reduce ,nred)])
864890 end
891+ finish! (progressbar)
865892 end
866893
867894 return_unless_error (first (branches))
0 commit comments