Skip to content

Commit c76049f

Browse files
committed
Parallel framework refactor.
1 parent b518702 commit c76049f

17 files changed

+533
-411
lines changed

Common/Cpp/Concurrency/AsyncDispatcher.cpp

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ AsyncDispatcher::~AsyncDispatcher(){
4242
thread.join();
4343
}
4444
for (AsyncTask* task : m_queue){
45-
task->signal();
45+
task->report_cancelled();
46+
// task->signal();
4647
}
4748
}
4849

@@ -58,7 +59,7 @@ void AsyncDispatcher::dispatch_task(AsyncTask& task){
5859
std::lock_guard<std::mutex> lg(m_lock);
5960

6061
// Enqueue task.
61-
m_queue.emplace_back(&task);
62+
m_queue.emplace_back(&task)->report_started();
6263

6364
// Make sure a thread is ready for it.
6465
if (m_queue.size() > m_threads.size() - m_busy_count){
@@ -97,7 +98,7 @@ void AsyncDispatcher::run_in_parallel(
9798

9899
// Enqueue tasks.
99100
for (std::unique_ptr<AsyncTask>& task : tasks){
100-
m_queue.emplace_back(task.get());
101+
m_queue.emplace_back(task.get())->report_started();
101102
}
102103

103104
// Make sure there are enough threads.
@@ -153,18 +154,7 @@ void AsyncDispatcher::thread_loop(){
153154
m_busy_count++;
154155
}
155156

156-
try{
157-
task->m_task();
158-
}catch (...){
159-
task->m_exception = std::current_exception();
160-
task->m_stopped_with_error.store(true, std::memory_order_release);
161-
// cout << "Task threw an exception." << endl;
162-
// std::lock_guard<std::mutex> lg(m_lock);
163-
// for (AsyncTask* t : m_queue){
164-
// t->signal();
165-
// }
166-
}
167-
task->signal();
157+
task->run();
168158
}
169159
// cout << "AsyncDispatcher::thread_loop() End (outside) = " << GetCurrentThreadId() << endl;
170160
}

Common/Cpp/Concurrency/AsyncTask.cpp

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,52 @@
66

77
#include "AsyncTask.h"
88

9+
//#include <iostream>
10+
//using std::cout;
11+
//using std::endl;
12+
913
namespace PokemonAutomation{
1014

1115

1216

1317
AsyncTask::~AsyncTask(){
14-
std::unique_lock<std::mutex> lg(m_lock);
15-
m_cv.wait(lg, [this]{ return m_finished; });
16-
}
17-
bool AsyncTask::is_finished() const{
18-
std::lock_guard<std::mutex> lg(m_lock);
19-
return m_finished;
20-
}
21-
void AsyncTask::rethrow_exceptions(){
22-
if (!m_stopped_with_error.load(std::memory_order_acquire)){
18+
State state = m_state.load(std::memory_order_acquire);
19+
if (state == State::NOT_STARTED || state == State::SAFE_TO_DESTRUCT){
20+
// cout << "Already Done: " << (int)state << endl;
2321
return;
2422
}
25-
std::unique_lock<std::mutex> lg(m_lock);
26-
if (m_exception){
27-
std::rethrow_exception(m_exception);
23+
24+
{
25+
std::unique_lock<std::mutex> lg(m_lock);
26+
m_cv.wait(lg, [this]{
27+
return m_state.load(std::memory_order_relaxed) != State::RUNNING;
28+
});
2829
}
29-
}
30-
void AsyncTask::wait_and_rethrow_exceptions(){
31-
std::unique_lock<std::mutex> lg(m_lock);
32-
m_cv.wait(lg, [this]{ return m_finished; });
33-
if (m_exception){
34-
std::rethrow_exception(m_exception);
30+
31+
while (m_state.load(std::memory_order_acquire) != State::SAFE_TO_DESTRUCT){
32+
// pause
3533
}
34+
35+
// cout << "Late Finish" << endl;
3636
}
37-
void AsyncTask::signal(){
38-
std::lock_guard<std::mutex> lg(m_lock);
39-
m_finished = true;
37+
38+
39+
void AsyncTask::report_cancelled() noexcept{
40+
m_state.store(State::FINISHED, std::memory_order_release);
41+
{
42+
std::lock_guard<std::mutex> lg(m_lock);
43+
}
4044
m_cv.notify_all();
45+
m_state.store(State::SAFE_TO_DESTRUCT, std::memory_order_release);
46+
}
47+
void AsyncTask::run() noexcept{
48+
try{
49+
m_task();
50+
}catch (...){
51+
std::lock_guard<std::mutex> lg(m_lock);
52+
m_exception = std::current_exception();
53+
}
54+
report_cancelled();
4155
}
4256

4357

Common/Cpp/Concurrency/AsyncTask.h

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,57 @@ namespace PokemonAutomation{
1818

1919
class AsyncTask{
2020
public:
21-
// Wait for the task to finish before destructing. Doesn't rethrow exceptions.
22-
~AsyncTask();
23-
24-
bool is_finished() const;
21+
enum class State{
22+
NOT_STARTED,
23+
RUNNING,
24+
FINISHED,
25+
SAFE_TO_DESTRUCT,
26+
};
2527

26-
// If the task ended with an exception, rethrow it here.
27-
// This does not clear the exception.
28-
void rethrow_exceptions();
2928

30-
// Wait for the task to finish. Will rethrow any exceptions.
31-
void wait_and_rethrow_exceptions();
29+
public:
30+
// If the task has already started, this will wait for it to finish.
31+
// This will not rethrow exceptions.
32+
~AsyncTask();
3233

3334

34-
private:
35+
public:
3536
template <class... Args>
3637
AsyncTask(Args&&... args)
3738
: m_task(std::forward<Args>(args)...)
38-
, m_finished(false)
39-
, m_stopped_with_error(false)
39+
, m_state(State::NOT_STARTED)
4040
{}
41-
void signal();
4241

43-
private:
44-
friend class FireForgetDispatcher;
45-
friend class AsyncDispatcher;
46-
friend class ComputationThreadPoolCore;
42+
bool is_finished() const noexcept{
43+
State state = m_state.load(std::memory_order_acquire);
44+
return state == State::FINISHED || state == State::SAFE_TO_DESTRUCT;
45+
}
4746

47+
// Wait for the task to finish. Will rethrow any exceptions.
48+
void wait_and_rethrow_exceptions(){
49+
if (!is_finished()){
50+
std::unique_lock<std::mutex> lg(m_lock);
51+
m_cv.wait(lg, [this]{ return is_finished(); });
52+
}
53+
if (m_exception){
54+
std::rethrow_exception(m_exception);
55+
}
56+
}
57+
58+
59+
public:
60+
// These should only be called inside a parallel framework.
61+
// These are not thread-safe with each other.
62+
void report_started(){
63+
m_state.store(State::RUNNING, std::memory_order_release);
64+
}
65+
void report_cancelled() noexcept;
66+
void run() noexcept;
67+
68+
69+
private:
4870
std::function<void()> m_task;
49-
bool m_finished;
50-
std::atomic<bool> m_stopped_with_error;
71+
std::atomic<State> m_state;
5172
std::exception_ptr m_exception;
5273
mutable std::mutex m_lock;
5374
std::condition_variable m_cv;

0 commit comments

Comments
 (0)