From 2540e6f14642535ac4bcded2396f6aa346f6e5a6 Mon Sep 17 00:00:00 2001 From: Mickey Donaghy Date: Thu, 27 Oct 2022 11:34:23 +0900 Subject: [PATCH 01/10] Add accumulating variant of `Stream.debounce` --- core/shared/src/main/scala/fs2/Stream.scala | 44 +++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index bc0383b64c..36b8070e0a 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -663,6 +663,50 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, chan.stream.concurrently(debouncedSend) } } + + /** + * Like debounce, but group elements instead of dropping them + * + * @return A stream whose values contain the elements of this stream in order, + * and whose evaluation will force a delay `d` between emitting each value. + * The exact subsequence would depend on the chunk structure of this stream, + * and the timing they arrive. + * + * @example {{{ + * scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global + * scala> val s = Stream(1, 2, 3) ++ Stream.sleep_[IO](500.millis) ++ Stream(4, 5) ++ Stream.sleep_[IO](10.millis) ++ Stream(6) + * scala> val s2 = s.debounceAccumulate(100.milliseconds) + * scala> s2.compile.toVector.unsafeRunSync() + * res0: Vector[Int] = Vector(NonEmptyChain(1, 2, 3), NonEmptyChain(4, 5, 6)) + * }}} + */ + def debounceAccumulate[F2[x] >: F[x]](d: FiniteDuration)( + implicit F: Temporal[F2]): Stream[F2, NonEmptyChain[O]] = + Stream.force { + for { + chan <- Channel.bounded[F, NonEmptyChain[O]](1) + ref <- F.ref[Vector[O]](Vector.empty) + } yield { + val sendLatest: F[Unit] = + ref.getAndSet(Vector.empty).flatMap(l => NonEmptyChain.fromSeq(l).traverse_(chan.send)) + + def sendItem(o: O): F[Unit] = + ref.getAndUpdate(_ :+ o).flatMap { + case Vector() => (F.sleep(d) >> sendLatest).start.void + case _ => F.unit + } + + def go(tl: Stream[F, O]): Pull[F, Nothing, Unit] = + Pull.uncons(tl).flatMap { + case Some((hd, tl)) => Pull.eval(hd.traverse_(sendItem)) >> go(tl) + case None => Pull.eval(sendLatest >> chan.close.void) + } + + val debouncedSend: Stream[F, Nothing] = new Stream(go(this.underlying)) + + chan.stream.concurrently(debouncedSend) + } + } /** Throttles the stream to the specified `rate`. Unlike [[debounce]], [[metered]] doesn't drop elements. * From fef759ac48d97be70bef37f5c625041fb0664dd3 Mon Sep 17 00:00:00 2001 From: Mickey Donaghy Date: Thu, 27 Oct 2022 11:42:48 +0900 Subject: [PATCH 02/10] Run scalafmt --- core/shared/src/main/scala/fs2/Stream.scala | 42 ++++++++++----------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 36b8070e0a..fbadc0297d 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -663,29 +663,29 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, chan.stream.concurrently(debouncedSend) } } - - /** - * Like debounce, but group elements instead of dropping them - * - * @return A stream whose values contain the elements of this stream in order, - * and whose evaluation will force a delay `d` between emitting each value. - * The exact subsequence would depend on the chunk structure of this stream, - * and the timing they arrive. - * - * @example {{{ - * scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global - * scala> val s = Stream(1, 2, 3) ++ Stream.sleep_[IO](500.millis) ++ Stream(4, 5) ++ Stream.sleep_[IO](10.millis) ++ Stream(6) - * scala> val s2 = s.debounceAccumulate(100.milliseconds) - * scala> s2.compile.toVector.unsafeRunSync() - * res0: Vector[Int] = Vector(NonEmptyChain(1, 2, 3), NonEmptyChain(4, 5, 6)) - * }}} - */ - def debounceAccumulate[F2[x] >: F[x]](d: FiniteDuration)( - implicit F: Temporal[F2]): Stream[F2, NonEmptyChain[O]] = + + /** Like debounce, but group elements instead of dropping them + * + * @return A stream whose values contain the elements of this stream in order, + * and whose evaluation will force a delay `d` between emitting each value. + * The exact subsequence would depend on the chunk structure of this stream, + * and the timing they arrive. + * + * @example {{{ + * scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global + * scala> val s = Stream(1, 2, 3) ++ Stream.sleep_[IO](500.millis) ++ Stream(4, 5) ++ Stream.sleep_[IO](10.millis) ++ Stream(6) + * scala> val s2 = s.debounceAccumulate(100.milliseconds) + * scala> s2.compile.toVector.unsafeRunSync() + * res0: Vector[Int] = Vector(NonEmptyChain(1, 2, 3), NonEmptyChain(4, 5, 6)) + * }}} + */ + def debounceAccumulate[F2[x] >: F[x]]( + d: FiniteDuration + )(implicit F: Temporal[F2]): Stream[F2, NonEmptyChain[O]] = Stream.force { for { chan <- Channel.bounded[F, NonEmptyChain[O]](1) - ref <- F.ref[Vector[O]](Vector.empty) + ref <- F.ref[Vector[O]](Vector.empty) } yield { val sendLatest: F[Unit] = ref.getAndSet(Vector.empty).flatMap(l => NonEmptyChain.fromSeq(l).traverse_(chan.send)) @@ -706,7 +706,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, chan.stream.concurrently(debouncedSend) } - } + } /** Throttles the stream to the specified `rate`. Unlike [[debounce]], [[metered]] doesn't drop elements. * From a5d376dc354d62509c34a0a2a156415c1f1828fa Mon Sep 17 00:00:00 2001 From: Mickey Donaghy Date: Thu, 27 Oct 2022 11:48:36 +0900 Subject: [PATCH 03/10] Add import missed when transcribing --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index fbadc0297d..564017f784 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -25,7 +25,7 @@ import scala.annotation.{nowarn, tailrec} import scala.concurrent.TimeoutException import scala.concurrent.duration._ import cats.{Eval => _, _} -import cats.data.Ior +import cats.data.{Ior, NonEmptyChain} import cats.effect.Concurrent import cats.effect.kernel._ import cats.effect.kernel.implicits._ From d2ef7f124ac94f9d73f2c53d4fa8c8a308a5e57f Mon Sep 17 00:00:00 2001 From: Mickey Donaghy Date: Thu, 27 Oct 2022 11:53:36 +0900 Subject: [PATCH 04/10] Remember to use F2 throughout --- core/shared/src/main/scala/fs2/Stream.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 564017f784..2a1499c1ec 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -684,25 +684,25 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, )(implicit F: Temporal[F2]): Stream[F2, NonEmptyChain[O]] = Stream.force { for { - chan <- Channel.bounded[F, NonEmptyChain[O]](1) + chan <- Channel.bounded[F2, NonEmptyChain[O]](1) ref <- F.ref[Vector[O]](Vector.empty) } yield { - val sendLatest: F[Unit] = + val sendLatest: F2[Unit] = ref.getAndSet(Vector.empty).flatMap(l => NonEmptyChain.fromSeq(l).traverse_(chan.send)) - def sendItem(o: O): F[Unit] = + def sendItem(o: O): F2[Unit] = ref.getAndUpdate(_ :+ o).flatMap { case Vector() => (F.sleep(d) >> sendLatest).start.void case _ => F.unit } - def go(tl: Stream[F, O]): Pull[F, Nothing, Unit] = + def go(tl: Stream[F2, O]): Pull[F2, Nothing, Unit] = Pull.uncons(tl).flatMap { case Some((hd, tl)) => Pull.eval(hd.traverse_(sendItem)) >> go(tl) case None => Pull.eval(sendLatest >> chan.close.void) } - val debouncedSend: Stream[F, Nothing] = new Stream(go(this.underlying)) + val debouncedSend: Stream[F2, Nothing] = new Stream(go(this.underlying)) chan.stream.concurrently(debouncedSend) } From 81e346587a89cd478dfa2f0a3b64a1da184e0bed Mon Sep 17 00:00:00 2001 From: Mickey Donaghy Date: Thu, 27 Oct 2022 11:58:17 +0900 Subject: [PATCH 05/10] Correct pull plumbing --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 2a1499c1ec..74638c9365 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -696,7 +696,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, case _ => F.unit } - def go(tl: Stream[F2, O]): Pull[F2, Nothing, Unit] = + def go(tl: Pull[F2, O, Unit]): Pull[F2, Nothing, Unit] = Pull.uncons(tl).flatMap { case Some((hd, tl)) => Pull.eval(hd.traverse_(sendItem)) >> go(tl) case None => Pull.eval(sendLatest >> chan.close.void) From eb3f3ab463f28320d2d7b9218215de9e5bf9ffd8 Mon Sep 17 00:00:00 2001 From: Mickey Donaghy Date: Thu, 27 Oct 2022 12:30:15 +0900 Subject: [PATCH 06/10] Correct type in documentation example --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 74638c9365..afda754a3e 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -676,7 +676,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * scala> val s = Stream(1, 2, 3) ++ Stream.sleep_[IO](500.millis) ++ Stream(4, 5) ++ Stream.sleep_[IO](10.millis) ++ Stream(6) * scala> val s2 = s.debounceAccumulate(100.milliseconds) * scala> s2.compile.toVector.unsafeRunSync() - * res0: Vector[Int] = Vector(NonEmptyChain(1, 2, 3), NonEmptyChain(4, 5, 6)) + * res0: Vector[NonEmptyChain[Int]] = Vector(NonEmptyChain(1, 2, 3), NonEmptyChain(4, 5, 6)) * }}} */ def debounceAccumulate[F2[x] >: F[x]]( From b2cbec8235215765256aab2f0cd6d9535561438b Mon Sep 17 00:00:00 2001 From: Mickey Donaghy Date: Thu, 27 Oct 2022 12:38:40 +0900 Subject: [PATCH 07/10] Add missing import in doctest --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index afda754a3e..4c3b1a82bb 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -672,7 +672,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * and the timing they arrive. * * @example {{{ - * scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global + * scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global, cats.data.NonEmptyChain * scala> val s = Stream(1, 2, 3) ++ Stream.sleep_[IO](500.millis) ++ Stream(4, 5) ++ Stream.sleep_[IO](10.millis) ++ Stream(6) * scala> val s2 = s.debounceAccumulate(100.milliseconds) * scala> s2.compile.toVector.unsafeRunSync() From d0045f37fa9e00d49e8b1c30d4074e644e78347a Mon Sep 17 00:00:00 2001 From: Mickey Donaghy Date: Thu, 27 Oct 2022 13:03:26 +0900 Subject: [PATCH 08/10] Update doctest for newtype leakage --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 4c3b1a82bb..f6aba7f46b 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -676,7 +676,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * scala> val s = Stream(1, 2, 3) ++ Stream.sleep_[IO](500.millis) ++ Stream(4, 5) ++ Stream.sleep_[IO](10.millis) ++ Stream(6) * scala> val s2 = s.debounceAccumulate(100.milliseconds) * scala> s2.compile.toVector.unsafeRunSync() - * res0: Vector[NonEmptyChain[Int]] = Vector(NonEmptyChain(1, 2, 3), NonEmptyChain(4, 5, 6)) + * res0: Vector[NonEmptyChain[Int]] = Vector(Chain(1, 2, 3), Chain(4, 5, 6)) * }}} */ def debounceAccumulate[F2[x] >: F[x]]( From 526db01a915118175cf524fe606f0941bc3a99e1 Mon Sep 17 00:00:00 2001 From: Mickey Donaghy Date: Tue, 22 Nov 2022 06:17:17 +0900 Subject: [PATCH 09/10] Rework debounceAccumulate to use `Chunk` instead of `NonEmptyChain` --- core/shared/src/main/scala/fs2/Stream.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index f6aba7f46b..e6be94991c 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -25,7 +25,7 @@ import scala.annotation.{nowarn, tailrec} import scala.concurrent.TimeoutException import scala.concurrent.duration._ import cats.{Eval => _, _} -import cats.data.{Ior, NonEmptyChain} +import cats.data.Ior import cats.effect.Concurrent import cats.effect.kernel._ import cats.effect.kernel.implicits._ @@ -672,23 +672,23 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * and the timing they arrive. * * @example {{{ - * scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global, cats.data.NonEmptyChain + * scala> import scala.concurrent.duration._, cats.effect.IO, cats.effect.unsafe.implicits.global * scala> val s = Stream(1, 2, 3) ++ Stream.sleep_[IO](500.millis) ++ Stream(4, 5) ++ Stream.sleep_[IO](10.millis) ++ Stream(6) * scala> val s2 = s.debounceAccumulate(100.milliseconds) * scala> s2.compile.toVector.unsafeRunSync() - * res0: Vector[NonEmptyChain[Int]] = Vector(Chain(1, 2, 3), Chain(4, 5, 6)) + * res0: Vector[Chunk[Int]] = Vector(Chunk(1, 2, 3), Chunk(4, 5, 6)) * }}} */ def debounceAccumulate[F2[x] >: F[x]]( d: FiniteDuration - )(implicit F: Temporal[F2]): Stream[F2, NonEmptyChain[O]] = + )(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = Stream.force { for { - chan <- Channel.bounded[F2, NonEmptyChain[O]](1) + chan <- Channel.bounded[F2, Chunk[O]](1) ref <- F.ref[Vector[O]](Vector.empty) } yield { val sendLatest: F2[Unit] = - ref.getAndSet(Vector.empty).flatMap(l => NonEmptyChain.fromSeq(l).traverse_(chan.send)) + ref.getAndSet(Vector.empty).flatMap(l => F.whenA(l.nonEmpty)(chan.send(Chunk.indexedSeq(l)))) def sendItem(o: O): F2[Unit] = ref.getAndUpdate(_ :+ o).flatMap { From 0c07363e101d2782f82ef127528e2021af9be638 Mon Sep 17 00:00:00 2001 From: Mickey Donaghy Date: Tue, 22 Nov 2022 06:21:44 +0900 Subject: [PATCH 10/10] Run scalafmt --- core/shared/src/main/scala/fs2/Stream.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index e6be94991c..987663f40e 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -688,7 +688,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, ref <- F.ref[Vector[O]](Vector.empty) } yield { val sendLatest: F2[Unit] = - ref.getAndSet(Vector.empty).flatMap(l => F.whenA(l.nonEmpty)(chan.send(Chunk.indexedSeq(l)))) + ref + .getAndSet(Vector.empty) + .flatMap(l => F.whenA(l.nonEmpty)(chan.send(Chunk.indexedSeq(l)))) def sendItem(o: O): F2[Unit] = ref.getAndUpdate(_ :+ o).flatMap {