diff --git a/Common/Cpp/Concurrency/ComputationThreadPool.cpp b/Common/Cpp/Concurrency/ComputationThreadPool.cpp index 70b0774dc..7806b62f5 100644 --- a/Common/Cpp/Concurrency/ComputationThreadPool.cpp +++ b/Common/Cpp/Concurrency/ComputationThreadPool.cpp @@ -64,7 +64,10 @@ void ComputationThreadPool::run_in_parallel( ){ m_core->run_in_parallel(func, start, end, block_size); } - +void ComputationThreadPool::stop() { + // force call stop in computation thread pool + m_core->stop(); +} diff --git a/Common/Cpp/Concurrency/ComputationThreadPool.h b/Common/Cpp/Concurrency/ComputationThreadPool.h index b71194dbd..6c5d4f82f 100644 --- a/Common/Cpp/Concurrency/ComputationThreadPool.h +++ b/Common/Cpp/Concurrency/ComputationThreadPool.h @@ -41,6 +41,8 @@ class ComputationThreadPool final{ void ensure_threads(size_t threads); + void stop(); + // void wait_for_everything(); diff --git a/Common/Cpp/Concurrency/ComputationThreadPoolCore.cpp b/Common/Cpp/Concurrency/ComputationThreadPoolCore.cpp index 54c5f45be..05b03adf4 100644 --- a/Common/Cpp/Concurrency/ComputationThreadPoolCore.cpp +++ b/Common/Cpp/Concurrency/ComputationThreadPoolCore.cpp @@ -31,21 +31,40 @@ ComputationThreadPoolCore::ComputationThreadPoolCore( spawn_thread(); } } -ComputationThreadPoolCore::~ComputationThreadPoolCore(){ + +void ComputationThreadPoolCore::stop() { { std::lock_guard lg(m_lock); + if (m_stopping) return; m_stopping = true; m_thread_cv.notify_all(); // m_dispatch_cv.notify_all(); } for (ThreadData& thread : m_threads){ - thread.thread.join(); + if (thread.thread.joinable()) { + thread.thread.join(); + } } + + // DO NOT JOIN AGAIN IN DESTRUCTOR + m_threads.clear(); + for (auto& task : m_queue){ task->report_cancelled(); } + + // DO NOT CLEAR AGAIN IN DESTRUCTOR + m_queue.clear(); + } +ComputationThreadPoolCore::~ComputationThreadPoolCore(){ + stop(); +} + + + + WallDuration ComputationThreadPoolCore::cpu_time() const{ // TODO: Don't lock the entire queue. WallDuration ret = WallDuration::zero(); diff --git a/Common/Cpp/Concurrency/ComputationThreadPoolCore.h b/Common/Cpp/Concurrency/ComputationThreadPoolCore.h index c72a71e24..c69312f51 100644 --- a/Common/Cpp/Concurrency/ComputationThreadPoolCore.h +++ b/Common/Cpp/Concurrency/ComputationThreadPoolCore.h @@ -47,6 +47,8 @@ class ComputationThreadPoolCore final{ WallDuration cpu_time() const; void ensure_threads(size_t threads); + + void stop(); // void wait_for_everything(); diff --git a/Common/Cpp/Concurrency/FireForgetDispatcher.cpp b/Common/Cpp/Concurrency/FireForgetDispatcher.cpp index a796ae00b..09cac78d0 100644 --- a/Common/Cpp/Concurrency/FireForgetDispatcher.cpp +++ b/Common/Cpp/Concurrency/FireForgetDispatcher.cpp @@ -22,14 +22,22 @@ FireForgetDispatcher global_dispatcher; FireForgetDispatcher::FireForgetDispatcher() : m_stopping(false) {} -FireForgetDispatcher::~FireForgetDispatcher(){ + +void FireForgetDispatcher::stop() { { std::lock_guard lg(m_lock); + // like with the other thread option, make sure we're not double stopping + if (m_stopping) return; m_stopping = true; m_cv.notify_all(); } m_thread.join(); } + +FireForgetDispatcher::~FireForgetDispatcher(){ + stop(); +} + void FireForgetDispatcher::dispatch(std::function&& func){ std::lock_guard lg(m_lock); m_queue.emplace_back(std::move(func)); diff --git a/Common/Cpp/Concurrency/FireForgetDispatcher.h b/Common/Cpp/Concurrency/FireForgetDispatcher.h index 4fbc45af9..d621a6e9a 100644 --- a/Common/Cpp/Concurrency/FireForgetDispatcher.h +++ b/Common/Cpp/Concurrency/FireForgetDispatcher.h @@ -25,6 +25,8 @@ class FireForgetDispatcher{ // Call "handle->wait()" to wait for the task to finish. void dispatch(std::function&& func); + void stop(); + private: void thread_loop(); diff --git a/Common/Cpp/Concurrency/Thread.cpp b/Common/Cpp/Concurrency/Thread.cpp index c22342d2a..4329480e8 100644 --- a/Common/Cpp/Concurrency/Thread.cpp +++ b/Common/Cpp/Concurrency/Thread.cpp @@ -82,6 +82,10 @@ void Thread::join(){ } +bool Thread::joinable() const{ + return m_data && m_data->m_thread.joinable(); +} + } diff --git a/Common/Cpp/Concurrency/Thread.h b/Common/Cpp/Concurrency/Thread.h index e0b9e8bdb..edbad47cc 100644 --- a/Common/Cpp/Concurrency/Thread.h +++ b/Common/Cpp/Concurrency/Thread.h @@ -35,6 +35,7 @@ class Thread{ return m_data; } void join(); + bool joinable() const; private: enum class State; diff --git a/SerialPrograms/Source/CommonFramework/Main.cpp b/SerialPrograms/Source/CommonFramework/Main.cpp index 1668f3a33..183d2e8ae 100644 --- a/SerialPrograms/Source/CommonFramework/Main.cpp +++ b/SerialPrograms/Source/CommonFramework/Main.cpp @@ -5,8 +5,10 @@ //#include #include #include "Common/Cpp/Concurrency/AsyncTask.h" +#include "Common/Cpp/Concurrency/FireForgetDispatcher.h" #include "Common/Cpp/Exceptions.h" #include "Common/Cpp/ImageResolution.h" +#include "CommonFramework/Tools/GlobalThreadPools.h" #include "Globals.h" #include "GlobalSettingsPanel.h" #include "PersistentSettings.h" @@ -149,6 +151,12 @@ int main(int argc, char *argv[]){ Integration::DppClient::Client::instance().disconnect(); #endif + // Force stop the thread pool + PokemonAutomation::GlobalThreadPools::realtime_inference().stop(); + PokemonAutomation::GlobalThreadPools::normal_inference().stop(); + + PokemonAutomation::global_dispatcher.stop(); + // We must clear the OCR cache or it will crash on Linux when the library // unloads before the cache is destructed from static memory. OCR::clear_cache();