88#include < Windows.h>
99#endif
1010#include " Common/Cpp/PanicDump.h"
11+ #include " Common/Cpp/Containers/Pimpl.tpp"
12+ #include " Common/Cpp/CpuUtilization/CpuUtilization.h"
1113#include " ParallelTaskRunner.h"
1214
1315// #include <iostream>
1719namespace PokemonAutomation {
1820
1921
22+ class StopWatch {
23+ public:
24+ StopWatch ()
25+ : m_total(WallDuration::zero())
26+ , m_last_start(WallClock::max())
27+ {}
28+
29+ void start (){
30+ // cout << "start" << endl;
31+ if (m_last_start != WallClock::max ()){
32+ return ;
33+ }
34+ m_last_start = current_time ();
35+ }
36+ void stop (){
37+ // cout << "stop" << endl;
38+ if (m_last_start == WallClock::max ()){
39+ return ;
40+ }
41+ m_total += current_time () - m_last_start;
42+ m_last_start = WallClock::max ();
43+ }
44+ WallDuration total () const {
45+ WallDuration ret = m_total;
46+ if (m_last_start != WallClock::max ()){
47+ ret += current_time () - m_last_start;
48+ }
49+ // cout << "total: " << std::chrono::duration_cast<Milliseconds>(ret).count() << endl;
50+ return ret;
51+ }
52+
53+ private:
54+ WallDuration m_total;
55+ WallClock m_last_start;
56+ };
57+
58+
59+
60+
61+
62+ class ParallelTaskRunnerCore final {
63+ public:
64+ ParallelTaskRunnerCore (
65+ std::function<void ()>&& new_thread_callback,
66+ size_t starting_threads,
67+ size_t max_threads
68+ );
69+ ~ParallelTaskRunnerCore ();
70+
71+ size_t max_threads () const {
72+ return m_max_threads;
73+ }
74+ WallDuration cpu_time () const ;
75+
76+ void wait_for_everything ();
77+
78+ // Dispatch the function. If there are no threads available, it waits until
79+ // there are.
80+ std::shared_ptr<AsyncTask> blocking_dispatch (std::function<void ()>&& func);
81+
82+ // Dispatch the function. Returns null if no threads are available.
83+ // "func" will be moved-from only on success.
84+ std::shared_ptr<AsyncTask> try_dispatch (std::function<void ()>& func);
85+
86+ void run_in_parallel (
87+ const std::function<void (size_t index)>& func,
88+ size_t start, size_t end,
89+ size_t block_size = 0
90+ );
91+
92+
93+ private:
94+ struct ThreadData {
95+ std::thread thread;
96+ ThreadHandle handle;
97+ StopWatch runtime;
98+ };
99+
100+ void spawn_thread ();
101+ void thread_loop (ThreadData& data);
102+
103+
104+ private:
105+ struct Data ;
106+
107+ std::function<void ()> m_new_thread_callback;
108+ size_t m_max_threads;
109+ std::deque<std::shared_ptr<AsyncTask>> m_queue;
110+
111+ std::deque<ThreadData> m_threads;
112+
113+ bool m_stopping;
114+ size_t m_busy_count;
115+ mutable std::mutex m_lock;
116+ std::condition_variable m_thread_cv;
117+ std::condition_variable m_dispatch_cv;
118+ };
119+
120+
121+
122+
123+
124+
20125
21126ParallelTaskRunner::ParallelTaskRunner (
22127 std::function<void ()>&& new_thread_callback,
23128 size_t starting_threads,
24129 size_t max_threads
130+ )
131+ : m_core(
132+ CONSTRUCT_TOKEN,
133+ std::move (new_thread_callback),
134+ starting_threads,
135+ max_threads
136+ )
137+ {}
138+ ParallelTaskRunner::~ParallelTaskRunner () = default ;
139+ size_t ParallelTaskRunner::max_threads () const {
140+ return m_core->max_threads ();
141+ }
142+ WallDuration ParallelTaskRunner::cpu_time () const {
143+ return m_core->cpu_time ();
144+ }
145+ void ParallelTaskRunner::wait_for_everything (){
146+ m_core->wait_for_everything ();
147+ }
148+ std::shared_ptr<AsyncTask> ParallelTaskRunner::blocking_dispatch (std::function<void ()>&& func){
149+ return m_core->blocking_dispatch (std::move (func));
150+ }
151+ std::shared_ptr<AsyncTask> ParallelTaskRunner::try_dispatch (std::function<void ()>& func){
152+ return m_core->try_dispatch (func);
153+ }
154+ void ParallelTaskRunner::run_in_parallel (
155+ const std::function<void (size_t index)>& func,
156+ size_t start, size_t end,
157+ size_t block_size
158+ ){
159+ m_core->run_in_parallel (func, start, end, block_size);
160+ }
161+
162+
163+
164+
165+
166+
167+
168+
169+ ParallelTaskRunnerCore::ParallelTaskRunnerCore (
170+ std::function<void ()>&& new_thread_callback,
171+ size_t starting_threads,
172+ size_t max_threads
25173)
26174 : m_new_thread_callback(std::move(new_thread_callback))
27175 , m_max_threads(max_threads == 0 ? std::thread::hardware_concurrency() : max_threads)
28176 , m_stopping(false )
29177 , m_busy_count(0 )
30178{
31179 for (size_t c = 0 ; c < starting_threads; c++){
32- m_threads. emplace_back (run_with_catch, " ParallelTaskRunner::thread_loop() " , [ this ]{ thread_loop (); } );
180+ spawn_thread ( );
33181 }
34182}
35- ParallelTaskRunner ::~ParallelTaskRunner (){
183+ ParallelTaskRunnerCore ::~ParallelTaskRunnerCore (){
36184 {
37185 std::lock_guard<std::mutex> lg (m_lock);
38186 m_stopping = true ;
39187 m_thread_cv.notify_all ();
40188// m_dispatch_cv.notify_all();
41189 }
42- for (std::thread & thread : m_threads){
43- thread.join ();
190+ for (ThreadData & thread : m_threads){
191+ thread.thread . join ();
44192 }
45193 for (auto & task : m_queue){
46194 task->signal ();
47195 }
48196}
49197
50- void ParallelTaskRunner::wait_for_everything (){
198+ WallDuration ParallelTaskRunnerCore::cpu_time () const {
199+ // TODO: Don't lock the entire queue.
200+ WallDuration ret = WallDuration::zero ();
201+ std::lock_guard<std::mutex> lg (m_lock);
202+ for (const ThreadData& thread : m_threads){
203+ // ret += thread_cpu_time(thread.handle);
204+ ret += thread.runtime .total ();
205+ }
206+ return ret;
207+ }
208+
209+ void ParallelTaskRunnerCore::wait_for_everything (){
51210 std::unique_lock<std::mutex> lg (m_lock);
52211 m_dispatch_cv.wait (lg, [this ]{
53212 return m_queue.size () + m_busy_count == 0 ;
54213 });
55214}
56215
57- std::shared_ptr<AsyncTask> ParallelTaskRunner ::blocking_dispatch (std::function<void ()>&& func){
216+ std::shared_ptr<AsyncTask> ParallelTaskRunnerCore ::blocking_dispatch (std::function<void ()>&& func){
58217 std::shared_ptr<AsyncTask> task (new AsyncTask (std::move (func)));
59218
60219 std::unique_lock<std::mutex> lg (m_lock);
@@ -67,14 +226,14 @@ std::shared_ptr<AsyncTask> ParallelTaskRunner::blocking_dispatch(std::function<v
67226 m_queue.emplace_back (task);
68227
69228 if (m_queue.size () + m_busy_count > m_threads.size ()){
70- m_threads. emplace_back (run_with_catch, " ParallelTaskRunner::thread_loop() " , [ this ]{ thread_loop (); } );
229+ spawn_thread ( );
71230 }
72231
73232 m_thread_cv.notify_one ();
74233
75234 return task;
76235}
77- std::shared_ptr<AsyncTask> ParallelTaskRunner ::try_dispatch (std::function<void ()>& func){
236+ std::shared_ptr<AsyncTask> ParallelTaskRunnerCore ::try_dispatch (std::function<void ()>& func){
78237 std::lock_guard<std::mutex> lg (m_lock);
79238
80239 if (m_queue.size () + m_busy_count >= m_max_threads){
@@ -87,7 +246,7 @@ std::shared_ptr<AsyncTask> ParallelTaskRunner::try_dispatch(std::function<void()
87246 m_queue.emplace_back (task);
88247
89248 if (m_queue.size () + m_busy_count > m_threads.size ()){
90- m_threads. emplace_back (run_with_catch, " ParallelTaskRunner::thread_loop() " , [ this ]{ thread_loop (); } );
249+ spawn_thread ( );
91250 }
92251
93252 m_thread_cv.notify_one ();
@@ -96,7 +255,7 @@ std::shared_ptr<AsyncTask> ParallelTaskRunner::try_dispatch(std::function<void()
96255}
97256
98257
99- void ParallelTaskRunner ::run_in_parallel (
258+ void ParallelTaskRunnerCore ::run_in_parallel (
100259 const std::function<void (size_t index)>& func,
101260 size_t start, size_t end,
102261 size_t block_size
@@ -126,12 +285,23 @@ void ParallelTaskRunner::run_in_parallel(
126285
127286
128287
288+ void ParallelTaskRunnerCore::spawn_thread (){
289+ ThreadData& handle = m_threads.emplace_back ();
290+ handle.thread = std::thread (
291+ run_with_catch,
292+ " ParallelTaskRunner::thread_loop()" ,
293+ [&, this ]{ thread_loop (handle); }
294+ );
295+ }
296+ void ParallelTaskRunnerCore::thread_loop (ThreadData& data){
297+ data.handle = current_thread_handle ();
129298
130- void ParallelTaskRunner::thread_loop (){
131299 if (m_new_thread_callback){
132300 m_new_thread_callback ();
133301 }
134302
303+ data.runtime .start ();
304+
135305 bool busy = false ;
136306 while (true ){
137307 std::shared_ptr<AsyncTask> task;
@@ -147,7 +317,9 @@ void ParallelTaskRunner::thread_loop(){
147317 return ;
148318 }
149319 if (m_queue.empty ()){
320+ data.runtime .stop ();
150321 m_thread_cv.wait (lg);
322+ data.runtime .start ();
151323 continue ;
152324 }
153325
@@ -173,4 +345,12 @@ void ParallelTaskRunner::thread_loop(){
173345
174346
175347
348+
349+ // template class Pimpl<ParallelTaskRunnerCore>;
350+
351+
352+
353+
354+
355+
176356}
0 commit comments