diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index bc0383b64c..987663f40e 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -664,6 +664,52 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, } } + /** Like debounce, but group elements instead of dropping them + * + * @return A stream whose values contain the elements of this stream in order, + * and whose evaluation will force a delay `d` between emitting each value. + * The exact subsequence would depend on the chunk structure of this stream, + * and the timing they arrive. + * + * @example {{{ + * scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global + * scala> val s = Stream(1, 2, 3) ++ Stream.sleep_[IO](500.millis) ++ Stream(4, 5) ++ Stream.sleep_[IO](10.millis) ++ Stream(6) + * scala> val s2 = s.debounceAccumulate(100.milliseconds) + * scala> s2.compile.toVector.unsafeRunSync() + * res0: Vector[Chunk[Int]] = Vector(Chunk(1, 2, 3), Chunk(4, 5, 6)) + * }}} + */ + def debounceAccumulate[F2[x] >: F[x]]( + d: FiniteDuration + )(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = + Stream.force { + for { + chan <- Channel.bounded[F2, Chunk[O]](1) + ref <- F.ref[Vector[O]](Vector.empty) + } yield { + val sendLatest: F2[Unit] = + ref + .getAndSet(Vector.empty) + .flatMap(l => F.whenA(l.nonEmpty)(chan.send(Chunk.indexedSeq(l)))) + + def sendItem(o: O): F2[Unit] = + ref.getAndUpdate(_ :+ o).flatMap { + case Vector() => (F.sleep(d) >> sendLatest).start.void + case _ => F.unit + } + + def go(tl: Pull[F2, O, Unit]): Pull[F2, Nothing, Unit] = + Pull.uncons(tl).flatMap { + case Some((hd, tl)) => Pull.eval(hd.traverse_(sendItem)) >> go(tl) + case None => Pull.eval(sendLatest >> chan.close.void) + } + + val debouncedSend: Stream[F2, Nothing] = new Stream(go(this.underlying)) + + chan.stream.concurrently(debouncedSend) + } + } + /** Throttles the stream to the specified `rate`. Unlike [[debounce]], [[metered]] doesn't drop elements. * * Provided `rate` should be viewed as maximum rate: