1818#pragma once
1919
2020#include < condition_variable>
21- #include < mutex>
2221#include < queue>
2322#include " arrow/acero/backpressure_handler.h"
23+ #include " arrow/util/mutex.h"
2424
2525namespace arrow ::acero {
2626
@@ -34,32 +34,32 @@ class ConcurrentQueue {
3434 // Pops the last item from the queue but waits if the queue is empty until new items are
3535 // pushed.
3636 T WaitAndPop () {
37- std::unique_lock<std::mutex> lock ( mutex_);
38- WaitUntilNonEmpty (lock );
37+ arrow::util::Mutex::Guard guard = mutex_. Lock ( );
38+ WaitUntilNonEmpty (guard );
3939 return PopUnlocked ();
4040 }
4141
4242 // Pops the last item from the queue, or returns a nullopt if empty
4343 std::optional<T> TryPop () {
44- std::unique_lock<std::mutex> lock ( mutex_);
44+ arrow::util::Mutex::Guard guard = mutex_. Lock ( );
4545 return TryPopUnlocked ();
4646 }
4747
4848 // Pushes an item to the queue
4949 void Push (const T& item) {
50- std::unique_lock<std::mutex> lock ( mutex_);
50+ arrow::util::Mutex::Guard guard = mutex_. Lock ( );
5151 return PushUnlocked (item);
5252 }
5353
5454 // Clears the queue
5555 void Clear () {
56- std::unique_lock<std::mutex> lock ( mutex_);
56+ arrow::util::Mutex::Guard guard = mutex_. Lock ( );
5757 ClearUnlocked ();
5858 }
5959
6060 // Checks if the queue is empty
6161 bool Empty () const {
62- std::unique_lock<std::mutex> lock ( mutex_);
62+ arrow::util::Mutex::Guard guard = mutex_. Lock ( );
6363 return queue_.empty ();
6464 }
6565
@@ -69,17 +69,17 @@ class ConcurrentQueue {
6969 // Need to lock the queue because `front()` may be implemented in terms
7070 // of `begin()`, which isn't safe with concurrent calls to e.g. `push()`.
7171 // (see GH-44846)
72- std::unique_lock<std::mutex> lock ( mutex_);
72+ arrow::util::Mutex::Guard guard = mutex_. Lock ( );
7373 return queue_.front ();
7474 }
7575
7676 protected:
77- std::mutex& GetMutex () { return mutex_; }
77+ arrow::util::Mutex::Guard Lock () { return mutex_. Lock () ; }
7878
7979 size_t SizeUnlocked () const { return queue_.size (); }
8080
81- void WaitUntilNonEmpty (std::unique_lock<std::mutex>& lock ) {
82- cond_ .wait (lock , [&] { return !queue_.empty (); });
81+ void WaitUntilNonEmpty (arrow::util::Mutex::Guard& guard ) {
82+ guard .wait (cond_ , [&] { return !queue_.empty (); });
8383 }
8484
8585 T PopUnlocked () {
@@ -108,7 +108,7 @@ class ConcurrentQueue {
108108 std::queue<T> queue_;
109109
110110 private:
111- mutable std::mutex mutex_;
111+ mutable arrow::util::Mutex mutex_;
112112 std::condition_variable cond_;
113113};
114114
@@ -137,29 +137,29 @@ class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
137137 // Pops the last item from the queue but waits if the queue is empty until new items are
138138 // pushed.
139139 T WaitAndPop () {
140- std::unique_lock<std::mutex> lock ( ConcurrentQueue<T>::GetMutex () );
141- ConcurrentQueue<T>::WaitUntilNonEmpty (lock );
140+ arrow::util::Mutex::Guard guard = ConcurrentQueue<T>::Lock ( );
141+ ConcurrentQueue<T>::WaitUntilNonEmpty (guard );
142142 DoHandle do_handle (*this );
143143 return ConcurrentQueue<T>::PopUnlocked ();
144144 }
145145
146146 // Pops the last item from the queue, or returns a nullopt if empty
147147 std::optional<T> TryPop () {
148- std::unique_lock<std::mutex> lock ( ConcurrentQueue<T>::GetMutex () );
148+ arrow::util::Mutex::Guard guard = ConcurrentQueue<T>::Lock ( );
149149 DoHandle do_handle (*this );
150150 return ConcurrentQueue<T>::TryPopUnlocked ();
151151 }
152152
153153 // Pushes an item to the queue
154154 void Push (const T& item) {
155- std::unique_lock<std::mutex> lock ( ConcurrentQueue<T>::GetMutex () );
155+ arrow::util::Mutex::Guard guard = ConcurrentQueue<T>::Lock ( );
156156 DoHandle do_handle (*this );
157157 ConcurrentQueue<T>::PushUnlocked (item);
158158 }
159159
160160 // Clears the queue
161161 void Clear () {
162- std::unique_lock<std::mutex> lock ( ConcurrentQueue<T>::GetMutex () );
162+ arrow::util::Mutex::Guard guard = ConcurrentQueue<T>::Lock ( );
163163 DoHandle do_handle (*this );
164164 ConcurrentQueue<T>::ClearUnlocked ();
165165 }
0 commit comments