Skip to content

Commit 208f61d

Browse files
author
Rafał Hibner
committed
Do not hold lock when backpressure is applied
1 parent 3f1dc3c commit 208f61d

File tree

1 file changed

+21
-20
lines changed

1 file changed

+21
-20
lines changed

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,30 +1060,31 @@ class AsofJoinNode : public ExecNode {
10601060
}
10611061

10621062
bool Process() {
1063-
std::lock_guard<std::mutex> guard(gate_);
1064-
if (!CheckEnded()) {
1065-
return false;
1066-
}
1067-
10681063
// Process batches while we have data
10691064
for (;;) {
10701065
backpressure_future_.Wait();
1071-
Result<std::shared_ptr<RecordBatch>> result = ProcessInner();
1072-
1073-
if (result.ok()) {
1074-
auto out_rb = *result;
1075-
if (!out_rb) break;
1076-
ExecBatch out_b(*out_rb);
1077-
out_b.index = batches_produced_++;
1078-
DEBUG_SYNC(this, "produce batch ", out_b.index, ":", DEBUG_MANIP(std::endl),
1079-
out_rb->ToString(), DEBUG_MANIP(std::endl));
1080-
Status st = output_->InputReceived(this, std::move(out_b));
1081-
if (!st.ok()) {
1082-
EndFromProcessThread(std::move(st));
1066+
{
1067+
std::lock_guard<std::mutex> guard(gate_);
1068+
if (!CheckEnded()) {
1069+
return false;
1070+
}
1071+
Result<std::shared_ptr<RecordBatch>> result = ProcessInner();
1072+
1073+
if (result.ok()) {
1074+
auto out_rb = *result;
1075+
if (!out_rb) break;
1076+
ExecBatch out_b(*out_rb);
1077+
out_b.index = batches_produced_++;
1078+
DEBUG_SYNC(this, "produce batch ", out_b.index, ":", DEBUG_MANIP(std::endl),
1079+
out_rb->ToString(), DEBUG_MANIP(std::endl));
1080+
Status st = output_->InputReceived(this, std::move(out_b));
1081+
if (!st.ok()) {
1082+
EndFromProcessThread(std::move(st));
1083+
}
1084+
} else {
1085+
EndFromProcessThread(result.status());
1086+
return false;
10831087
}
1084-
} else {
1085-
EndFromProcessThread(result.status());
1086-
return false;
10871088
}
10881089
}
10891090

0 commit comments

Comments
 (0)