Skip to content

Commit 935aaee

Browse files
author
Rafał Hibner
committed
Add Wrapper and fix empty weak case
1 parent 11d7170 commit 935aaee

File tree

3 files changed

+54
-2
lines changed

3 files changed

+54
-2
lines changed

cpp/src/arrow/acero/backpressure.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ void BackpressureCombiner::Resume(Source* output, bool strong_connection) {
6464

6565
void BackpressureCombiner::UpdatePauseStateUnlocked() {
6666
bool should_be_paused =
67-
strong_paused_count_ > 0 || weak_paused_count_ == weak_paused_.size();
67+
strong_paused_count_ > 0 ||
68+
(weak_paused_count_ > 0 && weak_paused_count_ == weak_paused_.size());
6869
if (should_be_paused) {
6970
if (!paused) {
7071
backpressure_control_->Pause();

cpp/src/arrow/acero/backpressure.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
namespace arrow::acero {
2323

2424
// Generic backpressure controller for ExecNode
25-
class BackpressureController : public BackpressureControl {
25+
class ARROW_ACERO_EXPORT BackpressureController : public BackpressureControl {
2626
public:
2727
BackpressureController(ExecNode* node, ExecNode* output,
2828
std::atomic<int32_t>& backpressure_counter);
@@ -36,6 +36,18 @@ class BackpressureController : public BackpressureControl {
3636
std::atomic<int32_t>& backpressure_counter_;
3737
};
3838

39+
template <typename T>
40+
class ARROW_ACERO_EXPORT BackpressureControlWrapper : public BackpressureControl {
41+
public:
42+
explicit BackpressureControlWrapper(T* obj) : obj_(obj) {}
43+
44+
void Pause() override { obj_->Pause(); }
45+
void Resume() override { obj_->Resume(); }
46+
47+
private:
48+
T* obj_;
49+
};
50+
3951
// Provides infrastructure of combining multiple backpressure sources and propagate the
4052
// result into BackpressureControl There are two types of Source: strong - pause on any
4153
// strong Source within controller

cpp/src/arrow/acero/backpressure_test.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,44 @@ TEST(BackpressureCombiner, Basic) {
8181
ASSERT_FALSE(paused);
8282
}
8383

84+
TEST(BackpressureCombiner, OnlyStrong) {
85+
std::atomic<bool> paused{false};
86+
BackpressureCombiner combiner(std::make_unique<MonitorBackpressureControl>(paused));
87+
BackpressureCombiner::Source strong_source1(&combiner);
88+
BackpressureCombiner::Source strong_source2;
89+
strong_source2.AddController(&combiner);
90+
91+
// Any strong causes pause
92+
ASSERT_FALSE(paused);
93+
strong_source1.Pause();
94+
ASSERT_TRUE(paused);
95+
strong_source2.Pause();
96+
ASSERT_TRUE(paused);
97+
strong_source1.Resume();
98+
ASSERT_TRUE(paused);
99+
strong_source2.Resume();
100+
ASSERT_FALSE(paused);
101+
}
102+
103+
TEST(BackpressureCombiner, OnlyWeak) {
104+
std::atomic<bool> paused{false};
105+
BackpressureCombiner combiner(std::make_unique<MonitorBackpressureControl>(paused));
106+
107+
BackpressureCombiner::Source weak_source1(&combiner, false);
108+
BackpressureCombiner::Source weak_source2;
109+
weak_source2.AddController(&combiner, false);
110+
111+
// All weak cause pause
112+
ASSERT_FALSE(paused);
113+
weak_source1.Pause();
114+
ASSERT_FALSE(paused);
115+
weak_source2.Pause();
116+
ASSERT_TRUE(paused);
117+
weak_source1.Resume();
118+
ASSERT_FALSE(paused);
119+
weak_source2.Resume();
120+
ASSERT_FALSE(paused);
121+
}
122+
84123
} // namespace acero
85124
} // namespace arrow

0 commit comments

Comments
 (0)