Skip to content

Commit 02eeb15

Browse files
committed
More global thread pools. Remove fire-and-forget from ParallelTaskRunner.
1 parent 8a3463a commit 02eeb15

File tree

49 files changed

+329
-305
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+329
-305
lines changed

Common/Cpp/Concurrency/AsyncDispatcher.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ AsyncTask::~AsyncTask(){
1919
std::unique_lock<std::mutex> lg(m_lock);
2020
m_cv.wait(lg, [this]{ return m_finished; });
2121
}
22+
bool AsyncTask::is_finished() const{
23+
std::lock_guard<std::mutex> lg(m_lock);
24+
return m_finished;
25+
}
2226
void AsyncTask::rethrow_exceptions(){
2327
if (!m_stopped_with_error.load(std::memory_order_acquire)){
2428
return;

Common/Cpp/Concurrency/AsyncDispatcher.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class AsyncTask{
2929
// Wait for the task to finish before destructing. Doesn't rethrow exceptions.
3030
~AsyncTask();
3131

32+
bool is_finished() const;
33+
3234
// If the task ended with an exception, rethrow it here.
3335
// This does not clear the exception.
3436
void rethrow_exceptions();
@@ -55,7 +57,7 @@ class AsyncTask{
5557
bool m_finished;
5658
std::atomic<bool> m_stopped_with_error;
5759
std::exception_ptr m_exception;
58-
std::mutex m_lock;
60+
mutable std::mutex m_lock;
5961
std::condition_variable m_cv;
6062
};
6163

Common/Cpp/Concurrency/ParallelTaskRunner.cpp

Lines changed: 39 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "Common/Cpp/PanicDump.h"
1111
#include "Common/Cpp/Containers/Pimpl.tpp"
1212
#include "Common/Cpp/CpuUtilization/CpuUtilization.h"
13+
#include "Common/Cpp/Stopwatch.h"
1314
#include "ParallelTaskRunner.h"
1415

1516
//#include <iostream>
@@ -19,41 +20,6 @@
1920
namespace PokemonAutomation{
2021

2122

22-
class StopWatch{
23-
public:
24-
StopWatch()
25-
: m_total(WallDuration::zero())
26-
, m_last_start(WallClock::max())
27-
{}
28-
29-
void start(){
30-
// cout << "start" << endl;
31-
if (m_last_start != WallClock::max()){
32-
return;
33-
}
34-
m_last_start = current_time();
35-
}
36-
void stop(){
37-
// cout << "stop" << endl;
38-
if (m_last_start == WallClock::max()){
39-
return;
40-
}
41-
m_total += current_time() - m_last_start;
42-
m_last_start = WallClock::max();
43-
}
44-
WallDuration total() const{
45-
WallDuration ret = m_total;
46-
if (m_last_start != WallClock::max()){
47-
ret += current_time() - m_last_start;
48-
}
49-
// cout << "total: " << std::chrono::duration_cast<Milliseconds>(ret).count() << endl;
50-
return ret;
51-
}
52-
53-
private:
54-
WallDuration m_total;
55-
WallClock m_last_start;
56-
};
5723

5824

5925

@@ -68,20 +34,25 @@ class ParallelTaskRunnerCore final{
6834
);
6935
~ParallelTaskRunnerCore();
7036

37+
size_t current_threads() const{
38+
std::lock_guard<std::mutex> lg(m_lock);
39+
return m_threads.size();
40+
}
7141
size_t max_threads() const{
7242
return m_max_threads;
7343
}
7444
WallDuration cpu_time() const;
7545

76-
void wait_for_everything();
46+
void ensure_threads(size_t threads);
47+
// void wait_for_everything();
7748

7849
// Dispatch the function. If there are no threads available, it waits until
7950
// there are.
80-
std::shared_ptr<AsyncTask> blocking_dispatch(std::function<void()>&& func);
51+
[[nodiscard]] std::unique_ptr<AsyncTask> blocking_dispatch(std::function<void()>&& func);
8152

8253
// Dispatch the function. Returns null if no threads are available.
8354
// "func" will be moved-from only on success.
84-
std::shared_ptr<AsyncTask> try_dispatch(std::function<void()>& func);
55+
[[nodiscard]] std::unique_ptr<AsyncTask> try_dispatch(std::function<void()>& func);
8556

8657
void run_in_parallel(
8758
const std::function<void(size_t index)>& func,
@@ -94,7 +65,7 @@ class ParallelTaskRunnerCore final{
9465
struct ThreadData{
9566
std::thread thread;
9667
ThreadHandle handle;
97-
StopWatch runtime;
68+
Stopwatch runtime;
9869
};
9970

10071
void spawn_thread();
@@ -106,7 +77,7 @@ class ParallelTaskRunnerCore final{
10677

10778
std::function<void()> m_new_thread_callback;
10879
size_t m_max_threads;
109-
std::deque<std::shared_ptr<AsyncTask>> m_queue;
80+
std::deque<AsyncTask*> m_queue;
11081

11182
std::deque<ThreadData> m_threads;
11283

@@ -136,19 +107,25 @@ ParallelTaskRunner::ParallelTaskRunner(
136107
)
137108
{}
138109
ParallelTaskRunner::~ParallelTaskRunner() = default;
110+
size_t ParallelTaskRunner::current_threads() const{
111+
return m_core->current_threads();
112+
}
139113
size_t ParallelTaskRunner::max_threads() const{
140114
return m_core->max_threads();
141115
}
142116
WallDuration ParallelTaskRunner::cpu_time() const{
143117
return m_core->cpu_time();
144118
}
145-
void ParallelTaskRunner::wait_for_everything(){
146-
m_core->wait_for_everything();
119+
void ParallelTaskRunner::ensure_threads(size_t threads){
120+
m_core->ensure_threads(threads);
147121
}
148-
std::shared_ptr<AsyncTask> ParallelTaskRunner::blocking_dispatch(std::function<void()>&& func){
122+
//void ParallelTaskRunner::wait_for_everything(){
123+
// m_core->wait_for_everything();
124+
//}
125+
std::unique_ptr<AsyncTask> ParallelTaskRunner::blocking_dispatch(std::function<void()>&& func){
149126
return m_core->blocking_dispatch(std::move(func));
150127
}
151-
std::shared_ptr<AsyncTask> ParallelTaskRunner::try_dispatch(std::function<void()>& func){
128+
std::unique_ptr<AsyncTask> ParallelTaskRunner::try_dispatch(std::function<void()>& func){
152129
return m_core->try_dispatch(func);
153130
}
154131
void ParallelTaskRunner::run_in_parallel(
@@ -206,15 +183,24 @@ WallDuration ParallelTaskRunnerCore::cpu_time() const{
206183
return ret;
207184
}
208185

186+
187+
void ParallelTaskRunnerCore::ensure_threads(size_t threads){
188+
std::lock_guard<std::mutex> lg(m_lock);
189+
while (m_threads.size() < threads){
190+
spawn_thread();
191+
}
192+
}
193+
#if 0
209194
void ParallelTaskRunnerCore::wait_for_everything(){
210195
std::unique_lock<std::mutex> lg(m_lock);
211196
m_dispatch_cv.wait(lg, [this]{
212197
return m_queue.size() + m_busy_count == 0;
213198
});
214199
}
200+
#endif
215201

216-
std::shared_ptr<AsyncTask> ParallelTaskRunnerCore::blocking_dispatch(std::function<void()>&& func){
217-
std::shared_ptr<AsyncTask> task(new AsyncTask(std::move(func)));
202+
std::unique_ptr<AsyncTask> ParallelTaskRunnerCore::blocking_dispatch(std::function<void()>&& func){
203+
std::unique_ptr<AsyncTask> task(new AsyncTask(std::move(func)));
218204

219205
std::unique_lock<std::mutex> lg(m_lock);
220206

@@ -223,7 +209,7 @@ std::shared_ptr<AsyncTask> ParallelTaskRunnerCore::blocking_dispatch(std::functi
223209
});
224210

225211
// Enqueue task.
226-
m_queue.emplace_back(task);
212+
m_queue.emplace_back(task.get());
227213

228214
if (m_queue.size() + m_busy_count > m_threads.size()){
229215
spawn_thread();
@@ -233,17 +219,17 @@ std::shared_ptr<AsyncTask> ParallelTaskRunnerCore::blocking_dispatch(std::functi
233219

234220
return task;
235221
}
236-
std::shared_ptr<AsyncTask> ParallelTaskRunnerCore::try_dispatch(std::function<void()>& func){
222+
std::unique_ptr<AsyncTask> ParallelTaskRunnerCore::try_dispatch(std::function<void()>& func){
237223
std::lock_guard<std::mutex> lg(m_lock);
238224

239225
if (m_queue.size() + m_busy_count >= m_max_threads){
240226
return nullptr;
241227
}
242228

243-
std::shared_ptr<AsyncTask> task(new AsyncTask(std::move(func)));
229+
std::unique_ptr<AsyncTask> task(new AsyncTask(std::move(func)));
244230

245231
// Enqueue task.
246-
m_queue.emplace_back(task);
232+
m_queue.emplace_back(task.get());
247233

248234
if (m_queue.size() + m_busy_count > m_threads.size()){
249235
spawn_thread();
@@ -266,7 +252,7 @@ void ParallelTaskRunnerCore::run_in_parallel(
266252
size_t total = end - start;
267253
size_t blocks = (total + block_size - 1) / block_size;
268254

269-
std::vector<std::shared_ptr<AsyncTask>> tasks;
255+
std::vector<std::unique_ptr<AsyncTask>> tasks;
270256
for (size_t c = 0; c < blocks; c++){
271257
tasks.emplace_back(blocking_dispatch([=, &func]{
272258
size_t s = start + c * block_size;
@@ -278,7 +264,7 @@ void ParallelTaskRunnerCore::run_in_parallel(
278264
}));
279265
}
280266

281-
for (std::shared_ptr<AsyncTask>& task : tasks){
267+
for (std::unique_ptr<AsyncTask>& task : tasks){
282268
task->wait_and_rethrow_exceptions();
283269
}
284270
}
@@ -304,7 +290,7 @@ void ParallelTaskRunnerCore::thread_loop(ThreadData& data){
304290

305291
bool busy = false;
306292
while (true){
307-
std::shared_ptr<AsyncTask> task;
293+
AsyncTask* task;
308294
{
309295
std::unique_lock<std::mutex> lg(m_lock);
310296
if (busy){
@@ -334,10 +320,6 @@ void ParallelTaskRunnerCore::thread_loop(ThreadData& data){
334320
task->m_task();
335321
}catch (...){
336322
task->m_exception = std::current_exception();
337-
// std::lock_guard<std::mutex> lg(m_lock);
338-
// for (std::shared_ptr<AsyncTask>& t : m_queue){
339-
// t->signal();
340-
// }
341323
}
342324
task->signal();
343325
}

Common/Cpp/Concurrency/ParallelTaskRunner.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,21 @@ class ParallelTaskRunner final{
2626
);
2727
~ParallelTaskRunner();
2828

29+
size_t current_threads() const;
2930
size_t max_threads() const;
3031
WallDuration cpu_time() const;
3132

32-
void wait_for_everything();
33+
void ensure_threads(size_t threads);
34+
35+
// void wait_for_everything();
3336

3437
// Dispatch the function. If there are no threads available, it waits until
3538
// there are.
36-
std::shared_ptr<AsyncTask> blocking_dispatch(std::function<void()>&& func);
39+
[[nodiscard]] std::unique_ptr<AsyncTask> blocking_dispatch(std::function<void()>&& func);
3740

3841
// Dispatch the function. Returns null if no threads are available.
3942
// "func" will be moved-from only on success.
40-
std::shared_ptr<AsyncTask> try_dispatch(std::function<void()>& func);
43+
[[nodiscard]] std::unique_ptr<AsyncTask> try_dispatch(std::function<void()>& func);
4144

4245
void run_in_parallel(
4346
const std::function<void(size_t index)>& func,

Common/Cpp/Stopwatch.h

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/* Stop Watch
2+
*
3+
* From: https://github.com/PokemonAutomation/
4+
*
5+
*/
6+
7+
#ifndef PokemonAutomation_VideoPipeline_SnapshotManager_H
8+
#define PokemonAutomation_VideoPipeline_SnapshotManager_H
9+
10+
#include "Time.h"
11+
12+
namespace PokemonAutomation{
13+
14+
15+
class Stopwatch{
16+
public:
17+
Stopwatch()
18+
: m_total(WallDuration::zero())
19+
, m_last_start(WallClock::max())
20+
{}
21+
22+
void start(){
23+
// cout << "start" << endl;
24+
if (m_last_start != WallClock::max()){
25+
return;
26+
}
27+
m_last_start = current_time();
28+
}
29+
void stop(){
30+
// cout << "stop" << endl;
31+
if (m_last_start == WallClock::max()){
32+
return;
33+
}
34+
m_total += current_time() - m_last_start;
35+
m_last_start = WallClock::max();
36+
}
37+
WallDuration total() const{
38+
WallDuration ret = m_total;
39+
if (m_last_start != WallClock::max()){
40+
ret += current_time() - m_last_start;
41+
}
42+
// cout << "total: " << std::chrono::duration_cast<Milliseconds>(ret).count() << endl;
43+
return ret;
44+
}
45+
46+
private:
47+
WallDuration m_total;
48+
WallClock m_last_start;
49+
};
50+
51+
52+
53+
}
54+
#endif

SerialPrograms/CMakeLists.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ file(GLOB MAIN_SOURCES
184184
../Common/Cpp/Rectangle.tpp
185185
../Common/Cpp/RecursiveThrottler.h
186186
../Common/Cpp/SIMDDebuggers.h
187+
../Common/Cpp/Stopwatch.h
187188
../Common/Cpp/Sockets/AbstractClientSocket.h
188189
../Common/Cpp/Sockets/ClientSocket.cpp
189190
../Common/Cpp/Sockets/ClientSocket.h
@@ -536,8 +537,6 @@ file(GLOB MAIN_SOURCES
536537
Source/CommonTools/Audio/SpectrogramMatcher.h
537538
Source/CommonTools/DetectionDebouncer.h
538539
Source/CommonTools/FailureWatchdog.h
539-
Source/CommonTools/GlobalInferenceRunner.cpp
540-
Source/CommonTools/GlobalInferenceRunner.h
541540
Source/CommonTools/ImageMatch/CroppedImageDictionaryMatcher.cpp
542541
Source/CommonTools/ImageMatch/CroppedImageDictionaryMatcher.h
543542
Source/CommonTools/ImageMatch/ExactImageDictionaryMatcher.cpp

SerialPrograms/Source/CommonFramework/Options/Environment/PerformanceOptions.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,8 @@ class PerformanceOptions : public GroupOption{
7676
PA_ADD_OPTION(NORMAL_INFERENCE_PRIORITY);
7777
PA_ADD_OPTION(COMPUTE_PRIORITY);
7878

79-
// TODO: REMOVE: Enable when these are actually used.
8079
PA_ADD_OPTION(THREAD_POOL_REALTIME_INFERENCE);
81-
// PA_ADD_OPTION(THREAD_POOL_NORMAL_INFERENCE);
80+
PA_ADD_OPTION(THREAD_POOL_NORMAL_INFERENCE);
8281

8382
PA_ADD_OPTION(PROCESSOR_LEVEL);
8483
}

SerialPrograms/Source/CommonFramework/Options/ThreadPoolOption.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@ ThreadPoolOption::ThreadPoolOption(
1515
ThreadPriority default_priority,
1616
double default_max_thread_ratio
1717
)
18-
: GroupOption(std::move(label), LockMode::UNLOCK_WHILE_RUNNING)
18+
: GroupOption(
19+
std::move(label),
20+
LockMode::UNLOCK_WHILE_RUNNING,
21+
EnableMode::ALWAYS_ENABLED,
22+
true
23+
)
1924
, m_default_max_threads(
2025
std::max(
2126
(size_t)(std::thread::hardware_concurrency() * default_max_thread_ratio),
@@ -27,6 +32,7 @@ ThreadPoolOption::ThreadPoolOption(
2732
LockMode::UNLOCK_WHILE_RUNNING,
2833
std::thread::hardware_concurrency()
2934
)
35+
, m_description("Restart program for changes to take full effect.")
3036
, PRIORITY("<b>Thread Priority:</b>", default_priority)
3137
, MAX_THREADS(
3238
"<b>Maximum Threads:</b>",
@@ -35,6 +41,7 @@ ThreadPoolOption::ThreadPoolOption(
3541
)
3642
{
3743
PA_ADD_OPTION(HARDWARE_THREADS);
44+
PA_ADD_STATIC(m_description);
3845
PA_ADD_OPTION(PRIORITY);
3946
PA_ADD_OPTION(MAX_THREADS);
4047
HARDWARE_THREADS.set_visibility(ConfigOptionState::HIDDEN);

0 commit comments

Comments
 (0)