Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,52 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
}
}

/** Like debounce, but group elements instead of dropping them
*
* @return A stream whose values contain the elements of this stream in order,
* and whose evaluation will force a delay `d` between emitting each value.
* The exact subsequence would depend on the chunk structure of this stream,
* and the timing they arrive.
*
* @example {{{
* scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global
* scala> val s = Stream(1, 2, 3) ++ Stream.sleep_[IO](500.millis) ++ Stream(4, 5) ++ Stream.sleep_[IO](10.millis) ++ Stream(6)
* scala> val s2 = s.debounceAccumulate(100.milliseconds)
* scala> s2.compile.toVector.unsafeRunSync()
* res0: Vector[Chunk[Int]] = Vector(Chunk(1, 2, 3), Chunk(4, 5, 6))
* }}}
*/
def debounceAccumulate[F2[x] >: F[x]](
d: FiniteDuration
)(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] =
Stream.force {
for {
chan <- Channel.bounded[F2, Chunk[O]](1)
ref <- F.ref[Vector[O]](Vector.empty)
} yield {
val sendLatest: F2[Unit] =
ref
.getAndSet(Vector.empty)
.flatMap(l => F.whenA(l.nonEmpty)(chan.send(Chunk.indexedSeq(l))))

def sendItem(o: O): F2[Unit] =
ref.getAndUpdate(_ :+ o).flatMap {
case Vector() => (F.sleep(d) >> sendLatest).start.void
case _ => F.unit
}

def go(tl: Pull[F2, O, Unit]): Pull[F2, Nothing, Unit] =
Pull.uncons(tl).flatMap {
case Some((hd, tl)) => Pull.eval(hd.traverse_(sendItem)) >> go(tl)
case None => Pull.eval(sendLatest >> chan.close.void)
}

val debouncedSend: Stream[F2, Nothing] = new Stream(go(this.underlying))

chan.stream.concurrently(debouncedSend)
}
}

/** Throttles the stream to the specified `rate`. Unlike [[debounce]], [[metered]] doesn't drop elements.
*
* Provided `rate` should be viewed as maximum rate:
Expand Down