From e7988ace05f72d25bc8a1a9d945edafe80cdccb6 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 8 Oct 2025 18:36:55 +0200 Subject: [PATCH 1/8] Reproduce issue #4490 --- .../scala/cats/effect/std/Supervisor.scala | 273 +++++++++--------- .../cats/effect/std/SupervisorSpec.scala | 21 ++ 2 files changed, 165 insertions(+), 129 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index b47a583673..b3d7682b6d 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -141,7 +141,7 @@ object Supervisor { def apply[F[_]: Concurrent]: Resource[F, Supervisor[F]] = apply[F](false) - private sealed abstract class State[F[_]] { + private[std] sealed abstract class State[F[_]] { def remove(token: Unique.Token): F[Unit] @@ -150,6 +150,8 @@ object Supervisor { */ def add(token: Unique.Token, fiber: Fiber[F, Throwable, ?]): F[Boolean] + private[std] def numberOfFibers: F[Int] // for testing + // these are allowed to destroy the state, since they're only called during closing: val joinAll: F[Unit] val cancelAll: F[Unit] @@ -169,154 +171,161 @@ object Supervisor { case (st, _) => doneR.set(true) *> st.cancelAll } - } yield new Supervisor[F] { - - def supervise[A](fa: F[A]): F[Fiber[F, Throwable, A]] = - F.uncancelable { _ => - val monitor: (F[A], F[Unit]) => F[Fiber[F, Throwable, A]] = checkRestart match { - case Some(restart) => { (fa, fin) => - F.deferred[Outcome[F, Throwable, A]] flatMap { resultR => - F.ref(false) flatMap { canceledR => - F.deferred[Fiber[F, Throwable, A]].flatMap { firstCurrent => - // `currentR` holds (a `Deferred` to) the current - // incarnation of the fiber executing `fa`: - F.ref(firstCurrent).flatMap { currentR => - def action(current: Deferred[F, Fiber[F, Throwable, A]]): F[Unit] = { - F uncancelable { _ => - val started = F start { - fa guaranteeCase { oc => - F.deferred[Fiber[F, Throwable, A]].flatMap { newCurrent => - // we're replacing the `Deferred` holding - // the current fiber with a new one before - // the current fiber finishes, and even - // before we check for the cancel signal; - // this guarantees, that the fiber reachable - // through `currentR` is the last one (or - // null, see below): - currentR.set(newCurrent) *> { - canceledR.get flatMap { canceled => - doneR.get flatMap { done => - if (!canceled && !done && restart(oc)) { - action(newCurrent) - } else { - // we must complete `newCurrent`, - // because `cancel` below may wait - // for it; we signal that it is not - // restarted with `null`: - newCurrent.complete(null) *> fin.guarantee( - resultR.complete(oc).void) - } + } yield new SupervisorImpl[F](checkRestart, doneR, state) + } + + private[std] final class SupervisorImpl[F[_]]( + checkRestart: Option[Outcome[F, Throwable, ?] => Boolean], + doneR: Ref[F, Boolean], + private[std] val state: State[F], + )(implicit F: Concurrent[F]) extends Supervisor[F] { + + def supervise[A](fa: F[A]): F[Fiber[F, Throwable, A]] = + F.uncancelable { _ => + val monitor: (F[A], F[Unit]) => F[Fiber[F, Throwable, A]] = checkRestart match { + case Some(restart) => { (fa, fin) => + F.deferred[Outcome[F, Throwable, A]] flatMap { resultR => + F.ref(false) flatMap { canceledR => + F.deferred[Fiber[F, Throwable, A]].flatMap { firstCurrent => + // `currentR` holds (a `Deferred` to) the current + // incarnation of the fiber executing `fa`: + F.ref(firstCurrent).flatMap { currentR => + def action(current: Deferred[F, Fiber[F, Throwable, A]]): F[Unit] = { + F uncancelable { _ => + val started = F start { + fa guaranteeCase { oc => + F.deferred[Fiber[F, Throwable, A]].flatMap { newCurrent => + // we're replacing the `Deferred` holding + // the current fiber with a new one before + // the current fiber finishes, and even + // before we check for the cancel signal; + // this guarantees, that the fiber reachable + // through `currentR` is the last one (or + // null, see below): + currentR.set(newCurrent) *> { + canceledR.get flatMap { canceled => + doneR.get flatMap { done => + if (!canceled && !done && restart(oc)) { + action(newCurrent) + } else { + // we must complete `newCurrent`, + // because `cancel` below may wait + // for it; we signal that it is not + // restarted with `null`: + newCurrent.complete(null) *> fin.guarantee( + resultR.complete(oc).void) } } } } } } - - started flatMap { f => current.complete(f).void } } + + started flatMap { f => current.complete(f).void } } + } - action(firstCurrent).as( - new Fiber[F, Throwable, A] { - - private[this] val delegateF = currentR.get.flatMap(_.get) - - val cancel: F[Unit] = F uncancelable { _ => - // after setting `canceledR`, at - // most one restart happens, and - // the fiber we get through `delegateF` - // is the final one: - canceledR.set(true) *> delegateF flatMap { - case null => - // ok, task wasn't restarted, but we - // wait for the result to be completed - // (and the finalizer to run): - resultR.get.void - case fiber => - fiber.cancel *> fiber.join flatMap { - case Outcome.Canceled() => - // cancel successful (or self-canceled), - // but we don't know if the `guaranteeCase` - // above ran so we need to double check: - delegateF.flatMap { - case null => - // ok, the `guaranteeCase` - // certainly executed/ing: - resultR.get.void - case fiber2 => - // we cancelled the fiber before it did - // anything, so the finalizer didn't run, - // we need to do it now: - val cleanup = fin.guarantee( - resultR.complete(Outcome.Canceled()).void - ) - if (fiber2 eq fiber) { - cleanup - } else { - // this should never happen - cleanup *> F.raiseError(new AssertionError( - "unexpected fiber (this is a bug in Supervisor)")) - } - } - case _ => - // finished in error/success, - // the outcome will certainly - // be completed: - resultR.get.void - } - } + action(firstCurrent).as( + new Fiber[F, Throwable, A] { + + private[this] val delegateF = currentR.get.flatMap(_.get) + + val cancel: F[Unit] = F uncancelable { _ => + // after setting `canceledR`, at + // most one restart happens, and + // the fiber we get through `delegateF` + // is the final one: + canceledR.set(true) *> delegateF flatMap { + case null => + // ok, task wasn't restarted, but we + // wait for the result to be completed + // (and the finalizer to run): + resultR.get.void + case fiber => + fiber.cancel *> fiber.join flatMap { + case Outcome.Canceled() => + // cancel successful (or self-canceled), + // but we don't know if the `guaranteeCase` + // above ran so we need to double check: + delegateF.flatMap { + case null => + // ok, the `guaranteeCase` + // certainly executed/ing: + resultR.get.void + case fiber2 => + // we cancelled the fiber before it did + // anything, so the finalizer didn't run, + // we need to do it now: + val cleanup = fin.guarantee( + resultR.complete(Outcome.Canceled()).void + ) + if (fiber2 eq fiber) { + cleanup + } else { + // this should never happen + cleanup *> F.raiseError(new AssertionError( + "unexpected fiber (this is a bug in Supervisor)")) + } + } + case _ => + // finished in error/success, + // the outcome will certainly + // be completed: + resultR.get.void + } } - - def join = resultR.get } - ) - } + + def join = resultR.get + } + ) } } } } - - case None => (fa, fin) => F.start(fa.guarantee(fin)) } - for { - done <- F.ref(false) - insertResult <- F.deferred[Boolean] - token <- F.unique - cleanup = state.remove(token) - fiber <- monitor( - // if the supervisor have been (or is now) - // shutting down, inserting into state will - // fail; so we need to wait for the positive result - // of inserting, before actually doing the task: - insertResult - .get - .ifM( - fa, - F.canceled *> F.raiseError[A](new AssertionError( - "supervised fiber couldn't cancel (this is a bug in Supervisor)")) - ), - done.set(true) *> cleanup - ) - insertOk <- state.add(token, fiber) - _ <- insertResult.complete(insertOk) - // `cleanup` could run BEFORE the `state.add` - // (if `fa` is very fast), in which case it doesn't - // remove the fiber from the state, so we re-check: - _ <- done.get.ifM(cleanup, F.unit) - _ <- { - if (!insertOk) { - F.raiseError(new IllegalStateException("supervisor already shutdown")) - } else { - F.unit - } - } - } yield fiber + case None => (fa, fin) => F.start(fa.guarantee(fin)) } - } + + for { + done <- F.ref(false) + insertResult <- F.deferred[Boolean] + token <- F.unique + cleanup = state.remove(token) + fiber <- monitor( + // if the supervisor have been (or is now) + // shutting down, inserting into state will + // fail; so we need to wait for the positive result + // of inserting, before actually doing the task: + insertResult + .get + .ifM( + fa, + F.canceled *> F.raiseError[A](new AssertionError( + "supervised fiber couldn't cancel (this is a bug in Supervisor)")) + ), + done.set(true) *> cleanup + ) + insertOk <- state.add(token, fiber) + _ <- insertResult.complete(insertOk) + // `cleanup` could run BEFORE the `state.add` + // (if `fa` is very fast), in which case it doesn't + // remove the fiber from the state, so we re-check: + _ <- done.get.ifM(cleanup, F.unit) + _ <- { + if (!insertOk) { + F.raiseError(new IllegalStateException("supervisor already shutdown")) + } else { + F.unit + } + } + } yield fiber + } } + private[effect] def applyForConcurrent[F[_]]( await: Boolean, checkRestart: Option[Outcome[F, Throwable, ?] => Boolean])( @@ -335,6 +344,9 @@ object Supervisor { case map => (map.updated(token, fiber), true) } + private[std] final override def numberOfFibers: F[Int] = // for testing + stateRef.get.map(_.size) + private[this] val allFibers: F[List[Fiber[F, Throwable, ?]]] = { // we're closing, so we won't need the state any more, // so we're using `null` as a sentinel to reject later @@ -371,6 +383,9 @@ object Supervisor { F.delay(state.put(token, fiber)) *> doneR.get.map(!_) } + private[std] final override def numberOfFibers: F[Int] = // for testing + F.delay { state.size() } + private[this] val allFibers: F[List[Fiber[F, Throwable, ?]]] = F delay { val fibers = ListBuffer.empty[Fiber[F, Throwable, ?]] diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index 0065f87b33..d23a4d18ae 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -294,5 +294,26 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { tsk.parReplicateA_(if (isJVM) 1000 else 1).as(ok) } + + def superviseCancelRace(mkSupervisor: Resource[IO, Supervisor[IO]]) = { + val N = 1000 + val M = 20 + val tsk = mkSupervisor.use { supervisor => + supervisor.supervise(IO.unit).flatMap(_.cancel).replicateA_(N).parReplicateA_(M).flatMap { _ => + // let's wait for cleanup to happen: + IO.sleep(1.second) *> { + val st = supervisor.asInstanceOf[Supervisor.SupervisorImpl[IO]].state + st.numberOfFibers.flatMap { numFibs => + IO(numFibs mustEqual 0) + } + } + } + } + tsk.as(ok) + } + + "supervise / cancel race cleanup" in real { + superviseCancelRace(constructor(false, None)) + } } } From e0aac4ce0bd14cfd77531a292d1d7a92b550a7d5 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 8 Oct 2025 18:47:41 +0200 Subject: [PATCH 2/8] More tests --- .../src/test/scala/cats/effect/std/SupervisorSpec.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index d23a4d18ae..de82b7f858 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -315,5 +315,9 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { "supervise / cancel race cleanup" in real { superviseCancelRace(constructor(false, None)) } + + "supervise / cancel race cleanup (with restart)" in real { + superviseCancelRace(constructor(false, Some(_ => true))) + } } } From e07bc858355f03573dd1f7471edf671698fdfa77 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 8 Oct 2025 18:55:49 +0200 Subject: [PATCH 3/8] Fix it --- .../src/main/scala/cats/effect/std/Supervisor.scala | 8 +++++++- .../src/test/scala/cats/effect/std/SupervisorSpec.scala | 5 +++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index b3d7682b6d..24bb46fd3e 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -286,7 +286,13 @@ object Supervisor { } } - case None => (fa, fin) => F.start(fa.guarantee(fin)) + case None => { (fa, fin) => + F.start(fa).flatMap { fib => + F.start(F.uncancelable { _ => + fib.join.guarantee(fin) + }).as(fib) + } + } } for { diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index de82b7f858..b5da6f29cc 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -300,9 +300,10 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { val M = 20 val tsk = mkSupervisor.use { supervisor => supervisor.supervise(IO.unit).flatMap(_.cancel).replicateA_(N).parReplicateA_(M).flatMap { _ => - // let's wait for cleanup to happen: - IO.sleep(1.second) *> { + // let's wait a bit (for cleanup to happen): + IO.sleep(0.2.second) *> { val st = supervisor.asInstanceOf[Supervisor.SupervisorImpl[IO]].state + // the supervised fibers must've been cleaned up from the internal state: st.numberOfFibers.flatMap { numFibs => IO(numFibs mustEqual 0) } From 2ec6927231d30d33e24aad482de7f226d7bd2bb1 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 8 Oct 2025 18:57:42 +0200 Subject: [PATCH 4/8] scalafmt --- .../scala/cats/effect/std/Supervisor.scala | 14 ++++++-------- .../cats/effect/std/SupervisorSpec.scala | 19 +++++++++++-------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index 24bb46fd3e..13e7409ac3 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -175,10 +175,11 @@ object Supervisor { } private[std] final class SupervisorImpl[F[_]]( - checkRestart: Option[Outcome[F, Throwable, ?] => Boolean], - doneR: Ref[F, Boolean], - private[std] val state: State[F], - )(implicit F: Concurrent[F]) extends Supervisor[F] { + checkRestart: Option[Outcome[F, Throwable, ?] => Boolean], + doneR: Ref[F, Boolean], + private[std] val state: State[F] + )(implicit F: Concurrent[F]) + extends Supervisor[F] { def supervise[A](fa: F[A]): F[Fiber[F, Throwable, A]] = F.uncancelable { _ => @@ -288,9 +289,7 @@ object Supervisor { case None => { (fa, fin) => F.start(fa).flatMap { fib => - F.start(F.uncancelable { _ => - fib.join.guarantee(fin) - }).as(fib) + F.start(F.uncancelable { _ => fib.join.guarantee(fin) }).as(fib) } } } @@ -331,7 +330,6 @@ object Supervisor { } } - private[effect] def applyForConcurrent[F[_]]( await: Boolean, checkRestart: Option[Outcome[F, Throwable, ?] => Boolean])( diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index b5da6f29cc..9b273cc941 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -299,16 +299,19 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { val N = 1000 val M = 20 val tsk = mkSupervisor.use { supervisor => - supervisor.supervise(IO.unit).flatMap(_.cancel).replicateA_(N).parReplicateA_(M).flatMap { _ => - // let's wait a bit (for cleanup to happen): - IO.sleep(0.2.second) *> { - val st = supervisor.asInstanceOf[Supervisor.SupervisorImpl[IO]].state - // the supervised fibers must've been cleaned up from the internal state: - st.numberOfFibers.flatMap { numFibs => - IO(numFibs mustEqual 0) + supervisor + .supervise(IO.unit) + .flatMap(_.cancel) + .replicateA_(N) + .parReplicateA_(M) + .flatMap { _ => + // let's wait a bit (for cleanup to happen): + IO.sleep(0.2.second) *> { + val st = supervisor.asInstanceOf[Supervisor.SupervisorImpl[IO]].state + // the supervised fibers must've been cleaned up from the internal state: + st.numberOfFibers.flatMap { numFibs => IO(numFibs mustEqual 0) } } } - } } tsk.as(ok) } From accf623b80f3c4d2929a3f82c74ce02090a117a9 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 8 Oct 2025 19:07:13 +0200 Subject: [PATCH 5/8] Cleanup --- std/shared/src/main/scala/cats/effect/std/Supervisor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index 13e7409ac3..dd7d14ffd6 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -289,7 +289,7 @@ object Supervisor { case None => { (fa, fin) => F.start(fa).flatMap { fib => - F.start(F.uncancelable { _ => fib.join.guarantee(fin) }).as(fib) + F.start(fib.join.guarantee(fin)).as(fib) } } } From 0cb21694551ed394faa3589f1c429e3e927867ba Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 8 Oct 2025 19:07:50 +0200 Subject: [PATCH 6/8] scalafmt --- std/shared/src/main/scala/cats/effect/std/Supervisor.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala index dd7d14ffd6..2302f11c2d 100644 --- a/std/shared/src/main/scala/cats/effect/std/Supervisor.scala +++ b/std/shared/src/main/scala/cats/effect/std/Supervisor.scala @@ -288,9 +288,7 @@ object Supervisor { } case None => { (fa, fin) => - F.start(fa).flatMap { fib => - F.start(fib.join.guarantee(fin)).as(fib) - } + F.start(fa).flatMap { fib => F.start(fib.join.guarantee(fin)).as(fib) } } } From 52cb20349ab72051842370b03c1f5dc3f74f5523 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 8 Oct 2025 19:31:57 +0200 Subject: [PATCH 7/8] mima --- build.sbt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index df3c905456..f5ce144209 100644 --- a/build.sbt +++ b/build.sbt @@ -1104,7 +1104,10 @@ lazy val std = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[FinalMethodProblem]( "cats.effect.std.Dispatcher#RegState#Unstarted.toString"), ProblemFilters.exclude[DirectMissingMethodProblem]( - "cats.effect.std.Dispatcher#Registration#Primary.*") + "cats.effect.std.Dispatcher#Registration#Primary.*"), + // #4500, private class: + ProblemFilters.exclude[ReversedMissingMethodProblem]( + "cats.effect.std.Supervisor#State.numberOfFibers") ) ) .jsSettings( From 5e9b6172d8682e079eb2f98a63d26d9fafb31377 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 8 Oct 2025 20:18:08 +0200 Subject: [PATCH 8/8] Try to fix test timeout on JS --- .../src/test/scala/cats/effect/std/SupervisorSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala index 9b273cc941..db6c0fa609 100644 --- a/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/SupervisorSpec.scala @@ -296,8 +296,8 @@ class SupervisorSpec extends BaseSpec with DetectPlatform { } def superviseCancelRace(mkSupervisor: Resource[IO, Supervisor[IO]]) = { - val N = 1000 - val M = 20 + val N = if (isJVM) 1000 else 5 + val M = if (isJVM) 20 else 2 val tsk = mkSupervisor.use { supervisor => supervisor .supervise(IO.unit)