@@ -466,6 +466,70 @@ TEST(ExecPlanExecution, SinkNodeBackpressure) {
466466 CheckFinishesCancelledOrOk (plan->finished ());
467467}
468468
469+ TEST (ExecPlanExecution, SourceCloseGenerator) {
470+ std::optional<ExecBatch> batch =
471+ ExecBatchFromJSON ({int32 (), boolean ()},
472+ " [[4, false], [5, null], [6, false], [7, false], [null, true]]" );
473+ constexpr uint32_t kPauseIfAbove = 4 ;
474+ constexpr uint32_t kResumeIfBelow = 2 ;
475+ uint32_t pause_if_above_bytes =
476+ kPauseIfAbove * static_cast <uint32_t >(batch->TotalBufferSize ());
477+ uint32_t resume_if_below_bytes =
478+ kResumeIfBelow * static_cast <uint32_t >(batch->TotalBufferSize ());
479+ EXPECT_OK_AND_ASSIGN (std::shared_ptr<ExecPlan> plan, ExecPlan::Make ());
480+ PushGenerator<std::optional<ExecBatch>> batch_producer;
481+ AsyncGenerator<std::optional<ExecBatch>> sink_gen;
482+ BackpressureMonitor* backpressure_monitor;
483+ BackpressureOptions backpressure_options (resume_if_below_bytes, pause_if_above_bytes);
484+ std::shared_ptr<Schema> schema_ = schema ({field (" data" , uint32 ())});
485+ ARROW_EXPECT_OK (
486+ acero::Declaration::Sequence (
487+ {
488+ {" source" , SourceNodeOptions (schema_, batch_producer)},
489+ {" sink" , SinkNodeOptions{&sink_gen, /* schema=*/ nullptr ,
490+ backpressure_options, &backpressure_monitor}},
491+ })
492+ .AddToPlan (plan.get ()));
493+ ASSERT_TRUE (backpressure_monitor);
494+ plan->StartProducing ();
495+
496+ ASSERT_FALSE (backpressure_monitor->is_paused ());
497+
498+ // Should be able to push kPauseIfAbove batches without triggering back pressure
499+ for (uint32_t i = 0 ; i < kPauseIfAbove ; i++) {
500+ batch_producer.producer ().Push (batch);
501+ }
502+ SleepABit ();
503+ ASSERT_FALSE (backpressure_monitor->is_paused ());
504+
505+ // One more batch should trigger back pressure
506+ batch_producer.producer ().Push (batch);
507+ BusyWait (10 , [&] { return backpressure_monitor->is_paused (); });
508+ ASSERT_TRUE (backpressure_monitor->is_paused ());
509+
510+ batch_producer.producer ().Push (batch);
511+ batch_producer.producer ().Close ();
512+
513+ // Reading as much as we can while keeping it paused
514+ for (uint32_t i = kPauseIfAbove ; i >= kResumeIfBelow ; i--) {
515+ ASSERT_FINISHES_OK (sink_gen ());
516+ }
517+ SleepABit ();
518+ ASSERT_TRUE (backpressure_monitor->is_paused ());
519+
520+ // Reading one more item should open up backpressure
521+ ASSERT_FINISHES_OK (sink_gen ());
522+ ASSERT_FINISHES_OK (sink_gen ());
523+ BusyWait (10 , [&] { return !backpressure_monitor->is_paused (); });
524+ ASSERT_FALSE (backpressure_monitor->is_paused ());
525+
526+ plan->StopProducing ();
527+ ASSERT_FINISHES_OK (sink_gen ());
528+
529+ // Cleanup
530+ CheckFinishesCancelledOrOk (plan->finished ());
531+ }
532+
469533TEST (ExecPlan, ToString) {
470534 auto basic_data = MakeBasicBatches ();
471535 AsyncGenerator<std::optional<ExecBatch>> sink_gen;
@@ -1788,75 +1852,36 @@ TEST(ExecPlanExecution, UnalignedInput) {
17881852 ASSERT_LT (initial_bytes_allocated, default_memory_pool ()->total_bytes_allocated ());
17891853}
17901854
1791- struct ExecPlanErrorReporting : public testing ::TestWithParam<DummyNodeStatusReporter> {};
1792-
1793- TEST_P (ExecPlanErrorReporting, SourceSink) {
1794- ASSERT_OK_AND_ASSIGN (auto plan, ExecPlan::Make ());
1795- auto source = MakeDummyNode (plan.get (), " source" , /* inputs=*/ {}, /* is_sink=*/ false ,
1796- /* start_producing =*/ {},
1797- /* stop_producing =*/ {},
1798- /* status_reporter =*/ GetParam ());
1799- auto sink = MakeDummyNode (plan.get (), " sink" , /* inputs=*/ {source}, /* is_sink=*/ true ,
1800- /* start_producing =*/ {},
1801- /* stop_producing =*/ {},
1802- /* status_reporter =*/ GetParam ());
1803-
1804- ASSERT_OK (plan->Validate ());
1805- EXPECT_THAT (plan->nodes (), ElementsAre (source, sink));
1806-
1807- bool should_finish = GetParam ().start_producing .ok ();
1808- plan->StartProducing ();
1809- SleepABit ();
1810- if (should_finish)
1811- ASSERT_FINISHES_OK (plan->finished ());
1812- else
1813- ASSERT_FINISHES_AND_RAISES (Invalid, plan->finished ());
1814- }
1815-
1816- TEST_P (ExecPlanErrorReporting, InputReceived) {
1817- ASSERT_OK_AND_ASSIGN (auto plan, ExecPlan::Make ());
1818- auto basic_data = MakeBasicBatches ();
1819-
1820- ASSERT_OK_AND_ASSIGN (
1821- auto source,
1822- Declaration (" source" ,
1823- SourceNodeOptions{basic_data.schema , basic_data.gen (/* parallel=*/ false ,
1824- /* slow=*/ false )})
1825- .AddToPlan (plan.get ()));
1826-
1827- MakeDummyNode (plan.get (), " sink" , /* inputs=*/ {source}, /* is_sink=*/ true ,
1828- /* start_producing =*/ {},
1829- /* stop_producing =*/ {},
1830- /* status_reporter =*/ GetParam ());
1831-
1832- bool should_finish = GetParam ().start_producing .ok () &&
1833- GetParam ().input_received .ok () && GetParam ().input_finished .ok ();
1834- plan->StartProducing ();
1835- SleepABit ();
1836- if (should_finish)
1837- ASSERT_FINISHES_OK (plan->finished ());
1838- else
1839- ASSERT_FINISHES_AND_RAISES (Invalid, plan->finished ());
1840- }
1855+ struct ExecPlanErrorReporting : public testing ::TestWithParam<DummyNodeStatusReporter> {
1856+ public:
1857+ void SetUp () {
1858+ auto schema_ = schema ({field (" data" , uint32 ())});
1859+ batch = ExecBatchFromJSON (
1860+ {int32 (), boolean ()},
1861+ " [[4, false], [5, null], [6, false], [7, false], [null, true]]" );
1862+
1863+ ASSERT_OK_AND_ASSIGN (plan, ExecPlan::Make ());
1864+ ASSERT_OK_AND_ASSIGN (source,
1865+ Declaration (" source" , SourceNodeOptions (schema_, batch_producer))
1866+ .AddToPlan (plan.get ()));
1867+ sink = MakeDummyNode (plan.get (), " sink" , /* inputs=*/ {source}, /* is_sink=*/ true ,
1868+ /* start_producing =*/ {},
1869+ /* stop_producing =*/ {},
1870+ /* status_reporter =*/ GetParam ());
1871+
1872+ ASSERT_OK (plan->Validate ());
1873+ EXPECT_THAT (plan->nodes (), ElementsAre (source, sink));
1874+ }
18411875
1842- TEST_P (ExecPlanErrorReporting, Finish) {
1843- std::shared_ptr<Schema> schema_ = schema ({field (" data" , uint32 ())});
1844- std::optional<ExecBatch> batch =
1845- ExecBatchFromJSON ({int32 (), boolean ()},
1846- " [[4, false], [5, null], [6, false], [7, false], [null, true]]" );
1876+ protected:
1877+ std::shared_ptr<ExecPlan> plan;
18471878 PushGenerator<std::optional<ExecBatch>> batch_producer;
1848- ASSERT_OK_AND_ASSIGN (auto plan, ExecPlan::Make ());
1849- auto basic_data = MakeBasicBatches ();
1850-
1851- ASSERT_OK_AND_ASSIGN (auto source,
1852- Declaration (" source" , SourceNodeOptions (schema_, batch_producer))
1853- .AddToPlan (plan.get ()));
1854-
1855- MakeDummyNode (plan.get (), " sink" , /* inputs=*/ {source}, /* is_sink=*/ true ,
1856- /* start_producing =*/ {},
1857- /* stop_producing =*/ {},
1858- /* status_reporter =*/ GetParam ());
1879+ ExecNode* source;
1880+ ExecNode* sink;
1881+ std::optional<ExecBatch> batch;
1882+ };
18591883
1884+ TEST_P (ExecPlanErrorReporting, StopBeforeFinish) {
18601885 bool should_start = GetParam ().start_producing .ok ();
18611886 plan->StartProducing ();
18621887 SleepABit ();
@@ -1867,43 +1892,20 @@ TEST_P(ExecPlanErrorReporting, Finish) {
18671892 }
18681893 batch_producer.producer ().Push (batch);
18691894 SleepABit ();
1870-
1871- bool should_receive = should_start && GetParam ().input_received .ok ();
1872- if (should_receive) {
1873- ASSERT_FALSE (plan->finished ().is_finished ());
1874- } else {
1875- ASSERT_FINISHES_AND_RAISES (Invalid, plan->finished ());
1876- }
1877-
1878- batch_producer.producer ().Push (std::nullopt );
1895+ plan->StopProducing ();
18791896 SleepABit ();
1880- bool should_finish = should_receive && GetParam ().input_finished .ok ();
1881- if (should_finish) {
1882- ASSERT_FINISHES_OK (plan->finished ());
1897+ batch_producer.producer ().Close ();
1898+ bool should_stop = should_start && GetParam ().input_received .ok () &&
1899+ GetParam ().stop_producing .ok () && GetParam ().input_finished .ok ();
1900+ if (should_stop) {
1901+ ASSERT_FINISHES_AND_RAISES (Cancelled, plan->finished ());
18831902 } else {
18841903 ASSERT_FINISHES_AND_RAISES (Invalid, plan->finished ());
18851904 return ;
18861905 }
18871906}
18881907
1889- TEST_P (ExecPlanErrorReporting, StopProducing) {
1890- std::shared_ptr<Schema> schema_ = schema ({field (" data" , uint32 ())});
1891- std::optional<ExecBatch> batch =
1892- ExecBatchFromJSON ({int32 (), boolean ()},
1893- " [[4, false], [5, null], [6, false], [7, false], [null, true]]" );
1894- PushGenerator<std::optional<ExecBatch>> batch_producer;
1895- ASSERT_OK_AND_ASSIGN (auto plan, ExecPlan::Make ());
1896- auto basic_data = MakeBasicBatches ();
1897-
1898- ASSERT_OK_AND_ASSIGN (auto source,
1899- Declaration (" source" , SourceNodeOptions (schema_, batch_producer))
1900- .AddToPlan (plan.get ()));
1901-
1902- MakeDummyNode (plan.get (), " sink" , /* inputs=*/ {source}, /* is_sink=*/ true ,
1903- /* start_producing =*/ {},
1904- /* stop_producing =*/ {},
1905- /* status_reporter =*/ GetParam ());
1906-
1908+ TEST_P (ExecPlanErrorReporting, StopAfterFinish) {
19071909 bool should_start = GetParam ().start_producing .ok ();
19081910 plan->StartProducing ();
19091911 SleepABit ();
@@ -1915,21 +1917,14 @@ TEST_P(ExecPlanErrorReporting, StopProducing) {
19151917 batch_producer.producer ().Push (batch);
19161918 SleepABit ();
19171919
1918- bool should_receive = should_start && GetParam ().input_received .ok ();
1919- if (should_receive) {
1920- ASSERT_FALSE (plan->finished ().is_finished ());
1921- } else {
1922- ASSERT_FINISHES_AND_RAISES (Invalid, plan->finished ());
1923- }
1924- // this should not be needed
1925- batch_producer.producer ().Push (std::nullopt );
1920+ batch_producer.producer ().Close ();
19261921 SleepABit ();
19271922 plan->StopProducing ();
19281923 SleepABit ();
1929- bool should_stop =
1930- should_receive && GetParam ().stop_producing .ok () && GetParam ().input_finished .ok ();
1931- if (should_stop ) {
1932- ASSERT_FINISHES_AND_RAISES (Cancelled, plan->finished ());
1924+ bool should_finish =
1925+ should_start && GetParam ().input_received .ok () && GetParam ().input_finished .ok ();
1926+ if (should_finish ) {
1927+ ASSERT_FINISHES_OK ( plan->finished ());
19331928 } else {
19341929 ASSERT_FINISHES_AND_RAISES (Invalid, plan->finished ());
19351930 return ;
0 commit comments