Skip to content

Commit 0624fbb

Browse files
author
Rafał Hibner
committed
Do only one check for backpressure
1 parent d0cb912 commit 0624fbb

File tree

2 files changed

+16
-18
lines changed

2 files changed

+16
-18
lines changed

cpp/src/arrow/dataset/dataset_writer.cc

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class Throttle {
5454

5555
bool Unthrottled() const { return max_value_ <= 0; }
5656

57-
Future<> Acquire(uint64_t values) {
57+
std::optional<Future<>> Acquire(uint64_t values) {
5858
if (Unthrottled()) {
5959
return Future<>::MakeFinished();
6060
}
@@ -64,6 +64,8 @@ class Throttle {
6464
backpressure_ = Future<>::Make();
6565
} else {
6666
current_value_ += values;
67+
DCHECK(backpressure_.is_finished());
68+
return std::nullopt;
6769
}
6870
return backpressure_;
6971
}
@@ -72,9 +74,6 @@ class Throttle {
7274
if (Unthrottled()) {
7375
return;
7476
}
75-
if(current_value_<values){
76-
std::cout<<"Release: "<<current_value_<<"<"<<values<<std::endl;
77-
}
7877
Future<> to_complete;
7978
{
8079
std::lock_guard<std::mutex> lg(mutex_);
@@ -666,7 +665,7 @@ class DatasetWriter::DatasetWriterImpl {
666665
directory, prefix);
667666
}));
668667
std::shared_ptr<DatasetWriterDirectoryQueue> dir_queue = dir_queue_itr->second;
669-
Future<> backpressure;
668+
std::optional<Future<>> backpressure;
670669
while (batch) {
671670
// Keep opening new files until batch is done.
672671
std::shared_ptr<RecordBatch> remainder;
@@ -685,13 +684,13 @@ class DatasetWriter::DatasetWriterImpl {
685684
}
686685
backpressure =
687686
writer_state_->rows_in_flight_throttle.Acquire(next_chunk->num_rows());
688-
if (!backpressure.is_finished()) {
687+
if (backpressure) {
689688
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued");
690689
break;
691690
}
692691
if (will_open_file) {
693692
backpressure = writer_state_->open_files_throttle.Acquire(1);
694-
if (!backpressure.is_finished()) {
693+
if (backpressure) {
695694
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
696695
writer_state_->rows_in_flight_throttle.Release(next_chunk->num_rows());
697696
RETURN_NOT_OK(TryCloseLargestFile());
@@ -715,7 +714,8 @@ class DatasetWriter::DatasetWriterImpl {
715714
}
716715

717716
if (batch) {
718-
return backpressure.Then([this, batch, directory, prefix] {
717+
DCHECK(backpressure);
718+
return backpressure->Then([this, batch, directory, prefix] {
719719
return DoWriteRecordBatch(batch, directory, prefix);
720720
});
721721
}

cpp/src/arrow/dataset/dataset_writer_test.cc

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -280,32 +280,30 @@ TEST_F(DatasetWriterTestFixture, BatchGreaterThanMaxRowsQueued) {
280280
#pragma GCC push_options
281281
#pragma GCC optimize ("O0")
282282
TEST_F(DatasetWriterTestFixture, BatchWriteConcurrent) {
283-
auto dataset_writer = MakeDatasetWriter(/*max_rows=*/200);
283+
auto dataset_writer = MakeDatasetWriter(/*max_rows=*/5);
284284

285-
286-
for(int threads=20;threads>1;threads--){
287-
for(int iter=2;iter<100;iter*=2){
288-
for(int batch=2;batch<5000;batch*=2){
289-
std::cout<<threads<<" "<<iter<<" "<<batch<<std::endl;
285+
286+
for(int threads=1;threads<5;threads++){
287+
for(int iter=2;iter<=256;iter*=2){
288+
for(int batch=2;batch<=64;batch*=2){
290289
std::vector<std::thread> workers;
291290
for(int i=0;i<threads;++i){
292291
workers.push_back(std::thread(
293-
[&](){
292+
[&,i=i](){
294293
for(int j=0;j<iter;++j){
295294
while(paused_){SleepABit();};
296-
dataset_writer->WriteRecordBatch(MakeBatch(batch/threads), "");
295+
dataset_writer->WriteRecordBatch(MakeBatch(batch+i+10*j), "");
297296
}
298297
}));
299298
}
300299
for (std::thread &t: workers) {
301300
if (t.joinable()) {
302301
t.join();
303-
}
302+
}
304303
while(paused_){SleepABit();};
305304
}
306305
}
307306
}
308-
309307
}
310308
EndWriterChecked(dataset_writer.get());
311309
ASSERT_EQ(paused_, false);

0 commit comments

Comments
 (0)