|
20 | 20 | #include <chrono> |
21 | 21 | #include <mutex> |
22 | 22 | #include <optional> |
23 | | -#include <vector> |
24 | 23 | #include <random> |
25 | 24 | #include <thread> |
| 25 | +#include <vector> |
26 | 26 |
|
27 | 27 | #include "arrow/array/builder_primitive.h" |
28 | 28 | #include "arrow/dataset/file_ipc.h" |
@@ -278,29 +278,31 @@ TEST_F(DatasetWriterTestFixture, BatchGreaterThanMaxRowsQueued) { |
278 | 278 | } |
279 | 279 |
|
280 | 280 | #pragma GCC push_options |
281 | | -#pragma GCC optimize ("O0") |
| 281 | +#pragma GCC optimize("O0") |
282 | 282 | TEST_F(DatasetWriterTestFixture, BatchWriteConcurrent) { |
283 | 283 | auto dataset_writer = MakeDatasetWriter(/*max_rows=*/5); |
284 | 284 |
|
285 | | - |
286 | | - for(int threads=1;threads<5;threads++){ |
287 | | - for(int iter=2;iter<=256;iter*=2){ |
288 | | - for(int batch=2;batch<=64;batch*=2){ |
| 285 | + for (int threads = 1; threads < 5; threads++) { |
| 286 | + for (int iter = 2; iter <= 256; iter *= 2) { |
| 287 | + for (int batch = 2; batch <= 64; batch *= 2) { |
289 | 288 | std::vector<std::thread> workers; |
290 | | - for(int i=0;i<threads;++i){ |
291 | | - workers.push_back(std::thread( |
292 | | - [&,i=i](){ |
293 | | - for(int j=0;j<iter;++j){ |
294 | | - while(paused_){SleepABit();}; |
295 | | - dataset_writer->WriteRecordBatch(MakeBatch(batch+i+10*j), ""); |
| 289 | + for (int i = 0; i < threads; ++i) { |
| 290 | + workers.push_back(std::thread([&, i = i]() { |
| 291 | + for (int j = 0; j < iter; ++j) { |
| 292 | + while (paused_) { |
| 293 | + SleepABit(); |
| 294 | + } |
| 295 | + dataset_writer->WriteRecordBatch(MakeBatch(batch + i + 10 * j), ""); |
296 | 296 | } |
297 | 297 | })); |
298 | 298 | } |
299 | | - for (std::thread &t: workers) { |
| 299 | + for (std::thread& t : workers) { |
300 | 300 | if (t.joinable()) { |
301 | 301 | t.join(); |
302 | 302 | } |
303 | | - while(paused_){SleepABit();}; |
| 303 | + while (paused_) { |
| 304 | + SleepABit(); |
| 305 | + } |
304 | 306 | } |
305 | 307 | } |
306 | 308 | } |
|
0 commit comments