Skip to content

Commit adcbe2c

Browse files
author
Rafał Hibner
committed
Use arrow mutex implementation
1 parent d972d5b commit adcbe2c

File tree

2 files changed

+7
-7
lines changed

2 files changed

+7
-7
lines changed

cpp/src/arrow/acero/pipe_node.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ Pipe::Pipe(ExecPlan* plan, std::string pipe_name,
275275
const Ordering& Pipe::ordering() const { return ordering_; }
276276

277277
void Pipe::Pause(PipeSource* output, int counter) {
278-
std::lock_guard<std::mutex> lg(mutex_);
278+
auto lock = mutex_.Lock();
279279
auto& state = state_[output];
280280
if (state.backpressure_counter < counter) {
281281
if (!state.paused && !state.stopped) {
@@ -295,7 +295,7 @@ void Pipe::Pause(PipeSource* output, int counter) {
295295
}
296296

297297
void Pipe::Resume(PipeSource* output, int counter) {
298-
std::lock_guard<std::mutex> lg(mutex_);
298+
auto lock = mutex_.Lock();
299299
auto& state = state_[output];
300300
if (state.backpressure_counter < counter) {
301301
state.backpressure_counter = counter;
@@ -320,7 +320,7 @@ void Pipe::DoResume(SourceState& state) {
320320
}
321321

322322
Status Pipe::StopProducing(PipeSource* output) {
323-
std::lock_guard<std::mutex> lg(mutex_);
323+
auto lock = mutex_.Lock();
324324
auto& state = state_[output];
325325
DCHECK(!state.stopped);
326326
DoResume(state);
@@ -341,7 +341,7 @@ Status Pipe::StopProducing(PipeSource* output) {
341341
Status Pipe::InputReceived(ExecBatch batch) {
342342
for (auto& source_node : async_nodes_) {
343343
{
344-
std::lock_guard<std::mutex> lg(mutex_);
344+
auto lock = mutex_.Lock();
345345
if (state_[source_node].stopped) continue;
346346
}
347347
plan_->query_context()->ScheduleTask(

cpp/src/arrow/acero/pipe_node.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
// under the License.
1717

1818
#pragma once
19-
#include <mutex>
2019
#include <string>
2120
#include "arrow/acero/exec_plan.h"
2221
#include "arrow/acero/options.h"
2322
#include "arrow/acero/visibility.h"
23+
#include "arrow/util/mutex.h"
2424

2525
namespace arrow {
2626

@@ -114,10 +114,10 @@ class ARROW_ACERO_EXPORT Pipe {
114114
Ordering ordering_;
115115
std::string pipe_name_;
116116
std::vector<PipeSource*> async_nodes_;
117-
PipeSource* sync_node_{nullptr};
117+
PipeSource* sync_node_{NULLPTR};
118118
// backpressure
119119
std::unordered_map<PipeSource*, SourceState> state_;
120-
std::mutex mutex_;
120+
util::Mutex mutex_;
121121
std::atomic_size_t paused_count_{0};
122122
std::unique_ptr<BackpressureControl> ctrl_;
123123
// stopProducing

0 commit comments

Comments
 (0)