@@ -1552,6 +1552,149 @@ TEST(AsofJoinTest, BackpressureWithBatches) {
15521552 /* num_r0_batches=*/ 50 , /* num_r1_batches=*/ 20 , /* slow_r0=*/ true );
15531553}
15541554
1555+ TEST (AsofJoinTest, PauseProducingAsofJoinSource) {
1556+ int batch_size = 1 ;
1557+ auto make_shift = [batch_size](int num_batches, const std::shared_ptr<Schema>& schema,
1558+ int shift) {
1559+ return MakeIntegerBatches (
1560+ {[](int row) -> int64_t { return row; },
1561+ [num_batches](int row) -> int64_t { return row / num_batches; },
1562+ [shift](int row) -> int64_t { return row * 10 + shift; }},
1563+ schema, num_batches, batch_size);
1564+ };
1565+ auto l_schema =
1566+ schema ({field (" time" , int64 ()), field (" key" , int64 ()), field (" l_value" , int64 ())});
1567+ auto r_schema =
1568+ schema ({field (" time" , int64 ()), field (" key" , int64 ()), field (" r0_value" , int64 ())});
1569+
1570+ auto output_schema =
1571+ schema ({field (" time" , int64 ()), field (" key" , int64 ()), field (" l_value" , int64 ()),
1572+ field (" key" , int64 ()), field (" r0_value" , int64 ())});
1573+
1574+ ASSERT_OK_AND_ASSIGN (auto out_batch,
1575+ MakeIntegerBatches ({[](int row) -> int64_t { return row; },
1576+ [](int row) -> int64_t { return row; },
1577+ [](int row) -> int64_t { return row / 20 ; },
1578+ [](int row) -> int64_t { return row / 20 ; },
1579+ [](int row) -> int64_t { return row * 10 ; }},
1580+ output_schema, 20 , batch_size))
1581+
1582+ ASSERT_OK_AND_ASSIGN (auto l_batches, make_shift (50 , l_schema, 2 ));
1583+ ASSERT_OK_AND_ASSIGN (auto r0_batches, make_shift (50 , r_schema, 1 ));
1584+ std::optional<ExecBatch> out = out_batch.batches [0 ];
1585+
1586+ constexpr uint32_t thresholdOfBackpressure = 8 ;
1587+ constexpr uint32_t kPauseIfAbove = 4 ;
1588+ constexpr uint32_t kResumeIfBelow = 2 ;
1589+ uint32_t pause_if_above_bytes =
1590+ kPauseIfAbove * static_cast <uint32_t >(out->TotalBufferSize ());
1591+ uint32_t resume_if_below_bytes =
1592+ kResumeIfBelow * static_cast <uint32_t >(out->TotalBufferSize ());
1593+ EXPECT_OK_AND_ASSIGN (std::shared_ptr<ExecPlan> plan, ExecPlan::Make ());
1594+ PushGenerator<std::optional<ExecBatch>> batch_producer_left;
1595+ PushGenerator<std::optional<ExecBatch>> batch_producer_right;
1596+
1597+ AsyncGenerator<std::optional<ExecBatch>> sink_gen;
1598+ BackpressureMonitor* backpressure_monitor;
1599+ BackpressureOptions backpressure_options (resume_if_below_bytes, pause_if_above_bytes);
1600+ std::shared_ptr<Schema> schema_ = schema ({field (" data" , uint32 ())});
1601+
1602+ BackpressureCountingNode::Register ();
1603+
1604+ Declaration left{" source" , SourceNodeOptions (l_schema, batch_producer_left)};
1605+ Declaration right{" source" , SourceNodeOptions (r_schema, batch_producer_right)};
1606+ AsofJoinNodeOptions asof_join_opts ({{{" time" }, {}}, {{" time" }, {}}}, 1 );
1607+
1608+ BackpressureCounters bp_countersl, bp_countersr;
1609+ BackpressureCountingNode::Register ();
1610+
1611+ Declaration left_count{" backpressure_count" ,
1612+ {std::move (left)},
1613+ BackpressureCountingNodeOptions (&bp_countersl)};
1614+
1615+ Declaration right_count{" backpressure_count" ,
1616+ {std::move (right)},
1617+ BackpressureCountingNodeOptions (&bp_countersr)};
1618+
1619+ Declaration asof_join{" asofjoin" ,
1620+ {std::move (left_count), std::move (right_count)},
1621+ std::move (asof_join_opts)};
1622+
1623+ ARROW_EXPECT_OK (
1624+ acero::Declaration::Sequence (
1625+ {
1626+ std::move (asof_join),
1627+ {" sink" , SinkNodeOptions{&sink_gen, /* schema=*/ nullptr ,
1628+ backpressure_options, &backpressure_monitor}},
1629+ })
1630+ .AddToPlan (plan.get ()));
1631+
1632+ ASSERT_TRUE (backpressure_monitor);
1633+ plan->StartProducing ();
1634+ auto fut = plan->finished ();
1635+
1636+ ASSERT_FALSE (backpressure_monitor->is_paused ());
1637+
1638+ auto has_bp_been_applied = [&] {
1639+ for (size_t i = 0 ; i < 2 ; i++) {
1640+ const auto & counters = (i == 0 ) ? bp_countersl : bp_countersr;
1641+ if (counters.pause_count > 0 ) return true ;
1642+ }
1643+ return false ;
1644+ };
1645+
1646+ // Should be able to push kPauseIfAbove batches without triggering back pressure
1647+ uint32_t cnt = 0 ;
1648+ for (uint32_t i = 0 ; i < kPauseIfAbove ; i++) {
1649+ batch_producer_left.producer ().Push (l_batches.batches [i]);
1650+ batch_producer_right.producer ().Push (r0_batches.batches [i]);
1651+ cnt = i;
1652+ }
1653+ cnt++;
1654+
1655+ SleepABit ();
1656+ ASSERT_FALSE (backpressure_monitor->is_paused ());
1657+
1658+ // One more batch should trigger back pressure
1659+ batch_producer_right.producer ().Push (r0_batches.batches [cnt]);
1660+ batch_producer_left.producer ().Push (l_batches.batches [cnt]);
1661+
1662+ BusyWait (10 , [&] { return backpressure_monitor->is_paused (); });
1663+ ASSERT_TRUE (backpressure_monitor->is_paused ());
1664+
1665+ // Fill up the inputs of the asof join node
1666+ cnt++;
1667+ for (uint32_t i = cnt; i < thresholdOfBackpressure + cnt; i++) {
1668+ batch_producer_left.producer ().Push (l_batches.batches [i]);
1669+ batch_producer_right.producer ().Push (r0_batches.batches [i]);
1670+ }
1671+
1672+ BusyWait (20.0 , has_bp_been_applied);
1673+ ASSERT_TRUE (has_bp_been_applied ());
1674+
1675+ // Reading as much as we can while keeping it paused
1676+ for (uint32_t i = kPauseIfAbove ; i >= kResumeIfBelow ; i--) {
1677+ ASSERT_FINISHES_OK (sink_gen ());
1678+ }
1679+ SleepABit ();
1680+ ASSERT_TRUE (backpressure_monitor->is_paused ());
1681+
1682+ // Reading one more item should open up backpressure
1683+ ASSERT_FINISHES_OK (sink_gen ());
1684+ BusyWait (10 , [&] { return !backpressure_monitor->is_paused (); });
1685+ ASSERT_FALSE (backpressure_monitor->is_paused ());
1686+
1687+ // Cleanup
1688+ batch_producer_left.producer ().Push (IterationEnd<std::optional<ExecBatch>>());
1689+ batch_producer_right.producer ().Push (IterationEnd<std::optional<ExecBatch>>());
1690+
1691+ plan->StopProducing ();
1692+
1693+ ASSERT_TRUE (fut.Wait (kDefaultAssertFinishesWaitSeconds ));
1694+ if (!fut.status ().ok ()) {
1695+ ASSERT_TRUE (fut.status ().IsCancelled ());
1696+ }
1697+ }
15551698template <typename BatchesMaker>
15561699void TestSequencing (BatchesMaker maker, int num_batches, int batch_size) {
15571700 auto l_schema =
0 commit comments