2222#include < queue>
2323#include < vector>
2424
25+ #include " arrow/acero/query_context.h"
2526#include " arrow/compute/exec.h"
2627#include " arrow/util/logging_internal.h"
2728
@@ -160,6 +161,82 @@ class SerialSequencingQueueImpl : public SerialSequencingQueue {
160161 bool is_processing_ = false ;
161162};
162163
164+ class BackpressureProcessor : public SerialSequencingQueue ::Processor {
165+ private:
166+ struct DoHandle {
167+ explicit DoHandle (BackpressureProcessor& queue)
168+ : queue_(queue), start_size_(queue_.SizeUnlocked()) {}
169+
170+ ~DoHandle () {
171+ // unsynced access is safe since DoHandle is internally only used when the
172+ // lock is held
173+ size_t end_size = queue_.SizeUnlocked ();
174+ queue_.handler_ .Handle (start_size_, end_size);
175+ }
176+
177+ BackpressureProcessor& queue_;
178+ size_t start_size_;
179+ };
180+
181+ public:
182+ explicit BackpressureProcessor (SerialSequencingQueue::Processor* processor,
183+ BackpressureHandler handler, ExecPlan* plan,
184+ bool requires_io = true )
185+ : processor_(processor),
186+ handler_(std::move(handler)),
187+ plan_(plan),
188+ requires_io_(requires_io) {}
189+
190+ void Schedule () {
191+ if (requires_io_) {
192+ plan_->query_context ()->ScheduleIOTask ([this ]() { return DoProcess (); },
193+ " BackpressureProcessor::DoProcessIO" );
194+ } else {
195+ plan_->query_context ()->ScheduleTask ([this ]() { return DoProcess (); },
196+ " BackpressureProcessor::DoProcess" );
197+ }
198+ }
199+
200+ Status Process (ExecBatch batch) override {
201+ std::unique_lock lk (mutex_);
202+ {
203+ DoHandle do_handle (*this );
204+ sequenced_queue_.push (batch);
205+ }
206+ if (!is_processing_) {
207+ is_processing_ = true ;
208+ Schedule ();
209+ }
210+ return Status::OK ();
211+ }
212+
213+ private:
214+ Status DoProcess () {
215+ std::unique_lock lk (mutex_);
216+ while (!sequenced_queue_.empty ()) {
217+ ExecBatch next (sequenced_queue_.front ());
218+ {
219+ DoHandle do_handle (*this );
220+ sequenced_queue_.pop ();
221+ }
222+ lk.unlock ();
223+ ARROW_RETURN_NOT_OK (processor_->Process (std::move (next)));
224+ lk.lock ();
225+ }
226+ is_processing_ = false ;
227+ return Status::OK ();
228+ }
229+ size_t SizeUnlocked () const { return sequenced_queue_.size (); }
230+
231+ Processor* processor_;
232+ BackpressureHandler handler_;
233+ ExecPlan* plan_;
234+ bool requires_io_;
235+ std::mutex mutex_;
236+ std::queue<ExecBatch> sequenced_queue_;
237+ bool is_processing_ = false ;
238+ };
239+
163240} // namespace
164241
165242std::unique_ptr<SequencingQueue> SequencingQueue::Make (Processor* processor) {
@@ -170,6 +247,15 @@ std::unique_ptr<SerialSequencingQueue> SerialSequencingQueue::Make(Processor* pr
170247 return std::make_unique<SerialSequencingQueueImpl>(processor);
171248}
172249
250+ std::unique_ptr<SerialSequencingQueue::Processor>
251+ SerialSequencingQueue::Processor::MakeBackpressureWrapper (Processor* processor,
252+ BackpressureHandler handler,
253+ ExecPlan* plan,
254+ bool requires_io) {
255+ return std::make_unique<util::BackpressureProcessor>(processor, std::move (handler),
256+ plan);
257+ }
258+
173259} // namespace util
174260} // namespace acero
175261} // namespace arrow
0 commit comments