From 1953a1a9892dd03421bac3c4ec65fc074b625ff6 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 17:57:30 +0000 Subject: [PATCH 1/7] Initial plan From 5b933d250d33b9f5597421ba2f978bc70c35ec4d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 18:06:36 +0000 Subject: [PATCH 2/7] Implement DelayedQueueInMemory Scala wrapper with tests - Created DelayedQueueInMemory.scala wrapping JVM implementation - Implemented all DelayedQueue trait methods with proper type conversions - Added CronService wrapper for cron functionality - Created comprehensive test suite (19 tests, all passing) - Follows existing patterns (asScala/asJava conversions) - Uses Cats Effect IO for side effects management Co-authored-by: alexandru <11753+alexandru@users.noreply.github.com> --- .../scala/DelayedQueueInMemory.scala | 226 ++++++++++++++++++ .../scala/DelayedQueueInMemorySpec.scala | 225 +++++++++++++++++ 2 files changed, 451 insertions(+) create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala create mode 100644 delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala new file mode 100644 index 0000000..d27bdb1 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala @@ -0,0 +1,226 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.effect.IO +import cats.syntax.functor.* +import java.time.{Clock as JavaClock, Instant} +import org.funfix.delayedqueue.jvm +import org.funfix.delayedqueue.scala.AckEnvelope.asScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala +import org.funfix.delayedqueue.scala.BatchedReply.asScala +import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala +import scala.jdk.CollectionConverters.* + +/** In-memory implementation of [[DelayedQueue]] using concurrent data structures. + * + * This implementation wraps the JVM [[org.funfix.delayedqueue.jvm.DelayedQueueInMemory]] and + * provides an idiomatic Scala API with Cats Effect IO for managing side effects. + * + * The underlying implementation uses a ReentrantLock to protect mutable state (compatible with + * virtual threads) and condition variables for efficient blocking in `poll`. + * + * ==Example== + * + * {{{ + * import cats.effect.IO + * import java.time.Instant + * + * val queue = DelayedQueueInMemory.create[String]() + * + * for { + * _ <- queue.offerOrUpdate("key1", "Hello", Instant.now().plusSeconds(10)) + * envelope <- queue.poll + * _ <- IO.println(s"Received: \${envelope.payload}") + * _ <- envelope.acknowledge + * } yield () + * }}} + * + * @tparam A + * the type of message payloads + */ +object DelayedQueueInMemory { + + /** Creates an in-memory delayed queue with default configuration. + * + * @tparam A + * the type of message payloads + * @param timeConfig + * time configuration (defaults to [[DelayedQueueTimeConfig.DEFAULT_IN_MEMORY]]) + * @param ackEnvSource + * source identifier for envelopes (defaults to "delayed-queue-inmemory") + * @param clock + * clock for time operations (defaults to system UTC clock) + * @return + * a new DelayedQueue instance + */ + def create[A]( + timeConfig: DelayedQueueTimeConfig = DelayedQueueTimeConfig.DEFAULT_IN_MEMORY, + ackEnvSource: String = "delayed-queue-inmemory", + clock: JavaClock = JavaClock.systemUTC() + ): DelayedQueue[A] = { + val jvmQueue = jvm.DelayedQueueInMemory.create[A]( + timeConfig.asJava, + ackEnvSource, + clock + ) + new DelayedQueueInMemoryWrapper(jvmQueue) + } + + /** Wrapper that implements the Scala DelayedQueue trait by delegating to the JVM implementation. + */ + private class DelayedQueueInMemoryWrapper[A](underlying: jvm.DelayedQueueInMemory[A]) + extends DelayedQueue[A] { + + override def getTimeConfig: IO[DelayedQueueTimeConfig] = + IO(underlying.getTimeConfig.asScala) + + override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) + + override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) + + override def offerBatch[In]( + messages: List[BatchedMessage[In, A]] + ): IO[List[BatchedReply[In, A]]] = + IO { + val javaMessages = messages.map(_.asJava).asJava + val javaReplies = underlying.offerBatch(javaMessages) + javaReplies.asScala.toList.map(_.asScala) + } + + override def tryPoll: IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.tryPoll()).map(_.asScala) + } + + override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = + IO { + val javaEnvelope = underlying.tryPollMany(batchMaxSize) + AckEnvelope( + payload = javaEnvelope.payload.asScala.toList, + messageId = MessageId.asScala(javaEnvelope.messageId), + timestamp = javaEnvelope.timestamp, + source = javaEnvelope.source, + deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), + acknowledge = IO.blocking(javaEnvelope.acknowledge()) + ) + } + + override def poll: IO[AckEnvelope[A]] = + IO.interruptible { + underlying.poll().asScala + } + + override def read(key: String): IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.read(key)).map(_.asScala) + } + + override def dropMessage(key: String): IO[Boolean] = + IO(underlying.dropMessage(key)) + + override def containsMessage(key: String): IO[Boolean] = + IO(underlying.containsMessage(key)) + + override def dropAllMessages(confirm: String): IO[Int] = + IO(underlying.dropAllMessages(confirm)) + + override def getCron: IO[CronService[A]] = + IO(new CronServiceWrapper(underlying.getCron)) + } + + /** Wrapper for CronService that delegates to the JVM implementation. */ + private class CronServiceWrapper[A](underlying: jvm.CronService[A]) extends CronService[A] { + + override def installTick( + configHash: CronConfigHash, + keyPrefix: String, + messages: List[CronMessage[A]] + ): IO[Unit] = + IO { + val javaMessages = messages.map(_.asJava).asJava + underlying.installTick(configHash.asJava, keyPrefix, javaMessages) + } + + override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = + IO { + underlying.uninstallTick(configHash.asJava, keyPrefix) + } + + override def install( + configHash: CronConfigHash, + keyPrefix: String, + scheduleInterval: java.time.Duration, + generateMany: CronMessageBatchGenerator[A] + ): cats.effect.Resource[IO, Unit] = { + import cats.effect.Resource + + val acquire = IO { + val javaGenerator = new jvm.CronMessageBatchGenerator[A] { + override def invoke(now: Instant): java.util.List[jvm.CronMessage[A]] = + generateMany(now).map(_.asJava).asJava + } + underlying.install(configHash.asJava, keyPrefix, scheduleInterval, javaGenerator) + } + + val release = (closeable: AutoCloseable) => IO(closeable.close()) + + Resource.make(acquire)(release).void + } + + override def installDailySchedule( + keyPrefix: String, + schedule: CronDailySchedule, + generator: CronMessageGenerator[A] + ): cats.effect.Resource[IO, Unit] = { + import cats.effect.Resource + + val acquire = IO { + val javaGenerator = new jvm.CronMessageGenerator[A] { + override def invoke(at: Instant): jvm.CronMessage[A] = + generator(at).asJava + } + underlying.installDailySchedule(keyPrefix, schedule.asJava, javaGenerator) + } + + val release = (closeable: AutoCloseable) => IO(closeable.close()) + + Resource.make(acquire)(release).void + } + + override def installPeriodicTick( + keyPrefix: String, + period: java.time.Duration, + generator: CronPayloadGenerator[A] + ): cats.effect.Resource[IO, Unit] = { + import cats.effect.Resource + + val acquire = IO { + val javaGenerator = new jvm.CronPayloadGenerator[A] { + override def invoke(at: Instant): A = generator(at) + } + underlying.installPeriodicTick(keyPrefix, period, javaGenerator) + } + + val release = (closeable: AutoCloseable) => IO(closeable.close()) + + Resource.make(acquire)(release).void + } + } +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala new file mode 100644 index 0000000..f8485b5 --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala @@ -0,0 +1,225 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import java.time.Instant +import scala.concurrent.duration.* + +class DelayedQueueInMemorySpec extends munit.FunSuite { + + test("create should return a working queue") { + val queue = DelayedQueueInMemory.create[String]() + val result = queue.getTimeConfig.unsafeRunSync() + assertEquals(result, DelayedQueueTimeConfig.DEFAULT_IN_MEMORY) + } + + test("offerOrUpdate should create a new message") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().plusSeconds(10) + val result = queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() + assertEquals(result, OfferOutcome.Created) + } + + test("offerOrUpdate should update an existing message") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().plusSeconds(10) + + queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() + val result = queue.offerOrUpdate("key1", "payload2", scheduleAt.plusSeconds(5)).unsafeRunSync() + + assertEquals(result, OfferOutcome.Updated) + } + + test("offerIfNotExists should create a new message") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().plusSeconds(10) + val result = queue.offerIfNotExists("key1", "payload1", scheduleAt).unsafeRunSync() + assertEquals(result, OfferOutcome.Created) + } + + test("offerIfNotExists should ignore existing message") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().plusSeconds(10) + + queue.offerIfNotExists("key1", "payload1", scheduleAt).unsafeRunSync() + val result = + queue.offerIfNotExists("key1", "payload2", scheduleAt.plusSeconds(5)).unsafeRunSync() + + assertEquals(result, OfferOutcome.Ignored) + } + + test("tryPoll should return None when no messages are available") { + val queue = DelayedQueueInMemory.create[String]() + val result = queue.tryPoll.unsafeRunSync() + assertEquals(result, None) + } + + test("tryPoll should return a message when scheduled time has passed") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().minusSeconds(1) + + queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() + val envelope = queue.tryPoll.unsafeRunSync() + + assert(envelope.isDefined) + assertEquals(envelope.get.payload, "payload1") + assertEquals(envelope.get.messageId.value, "key1") + } + + test("tryPollMany should return empty list when no messages are available") { + val queue = DelayedQueueInMemory.create[String]() + val envelope = queue.tryPollMany(10).unsafeRunSync() + assertEquals(envelope.payload, List.empty[String]) + } + + test("tryPollMany should return multiple messages") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().minusSeconds(1) + + queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() + queue.offerOrUpdate("key2", "payload2", scheduleAt).unsafeRunSync() + queue.offerOrUpdate("key3", "payload3", scheduleAt).unsafeRunSync() + + val envelope = queue.tryPollMany(5).unsafeRunSync() + + assertEquals(envelope.payload.length, 3) + assert(envelope.payload.contains("payload1")) + assert(envelope.payload.contains("payload2")) + assert(envelope.payload.contains("payload3")) + } + + test("offerBatch should handle multiple messages") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().plusSeconds(10) + + val messages = List( + BatchedMessage("input1", ScheduledMessage("key1", "payload1", scheduleAt, canUpdate = true)), + BatchedMessage("input2", ScheduledMessage("key2", "payload2", scheduleAt, canUpdate = true)) + ) + + val replies = queue.offerBatch(messages).unsafeRunSync() + + assertEquals(replies.length, 2) + assertEquals(replies(0).outcome, OfferOutcome.Created) + assertEquals(replies(1).outcome, OfferOutcome.Created) + } + + test("read should return a message without locking it") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().plusSeconds(10) + + queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() + val envelope = queue.read("key1").unsafeRunSync() + + assert(envelope.isDefined) + assertEquals(envelope.get.payload, "payload1") + + // Message should still exist + val stillExists = queue.containsMessage("key1").unsafeRunSync() + assert(stillExists) + } + + test("dropMessage should remove a message") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().plusSeconds(10) + + queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() + val dropped = queue.dropMessage("key1").unsafeRunSync() + + assert(dropped) + + val exists = queue.containsMessage("key1").unsafeRunSync() + assert(!exists) + } + + test("containsMessage should return true for existing message") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().plusSeconds(10) + + queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() + val exists = queue.containsMessage("key1").unsafeRunSync() + + assert(exists) + } + + test("containsMessage should return false for non-existing message") { + val queue = DelayedQueueInMemory.create[String]() + val exists = queue.containsMessage("nonexistent").unsafeRunSync() + assert(!exists) + } + + test("dropAllMessages should remove all messages") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().plusSeconds(10) + + queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() + queue.offerOrUpdate("key2", "payload2", scheduleAt).unsafeRunSync() + + val count = queue.dropAllMessages("Yes, please, I know what I'm doing!").unsafeRunSync() + + assertEquals(count, 2) + + val exists1 = queue.containsMessage("key1").unsafeRunSync() + val exists2 = queue.containsMessage("key2").unsafeRunSync() + assert(!exists1) + assert(!exists2) + } + + test("acknowledge should delete the message") { + val queue = DelayedQueueInMemory.create[String]() + val scheduleAt = Instant.now().minusSeconds(1) + + queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() + val envelope = queue.tryPoll.unsafeRunSync() + + assert(envelope.isDefined) + envelope.get.acknowledge.unsafeRunSync() + + // Message should be deleted after acknowledgment + val exists = queue.containsMessage("key1").unsafeRunSync() + assert(!exists) + } + + test("getCron should return a CronService") { + val queue = DelayedQueueInMemory.create[String]() + val cronService = queue.getCron.unsafeRunSync() + assert(cronService != null) + } + + test("custom timeConfig should be used") { + val customConfig = DelayedQueueTimeConfig( + acquireTimeout = 60.seconds, + pollPeriod = 200.milliseconds + ) + val queue = DelayedQueueInMemory.create[String](timeConfig = customConfig) + val result = queue.getTimeConfig.unsafeRunSync() + assertEquals(result, customConfig) + } + + test("custom ackEnvSource should be used") { + val queue = DelayedQueueInMemory.create[String](ackEnvSource = "custom-source") + val scheduleAt = Instant.now().minusSeconds(1) + + queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() + val envelope = queue.tryPoll.unsafeRunSync() + + assert(envelope.isDefined) + assertEquals(envelope.get.source, "custom-source") + } +} From abfbf63e51c1da3e6c89d52f99ec487333ca98fe Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 9 Feb 2026 12:36:05 +0200 Subject: [PATCH 3/7] Change to F[_] --- build.sbt | 9 +- .../scala/DelayedQueueInMemory.scala | 226 ---------------- .../delayedqueue/scala/AckEnvelope.scala | 10 +- .../delayedqueue/scala/CronConfigHash.scala | 0 .../scala/CronDailySchedule.scala | 0 .../delayedqueue/scala/CronMessage.scala | 0 .../delayedqueue/scala/CronService.scala | 39 ++- .../delayedqueue/scala/DelayedQueue.scala | 27 +- .../scala/DelayedQueueInMemory.scala | 246 ++++++++++++++++++ .../scala/DelayedQueueJDBCConfig.scala | 0 .../scala/DelayedQueueTimeConfig.scala | 0 .../scala/JdbcConnectionConfig.scala | 0 .../scala/JdbcDatabasePoolConfig.scala | 0 .../delayedqueue/scala/JdbcDriver.scala | 0 .../delayedqueue/scala/OfferOutcome.scala | 0 .../delayedqueue/scala/RetryConfig.scala | 0 .../delayedqueue/scala/ScheduledMessage.scala | 0 .../delayedqueue/scala/exceptions.scala | 0 .../org/funfix/delayedqueue/scala/hello.scala | 0 .../funfix/delayedqueue/scala/CronSpec.scala | 0 .../scala/DataStructuresPropertySpec.scala | 0 .../scala/DelayedQueueInMemorySpec.scala | 0 .../delayedqueue/scala/Generators.scala | 0 .../delayedqueue/scala/HelloSuite.scala | 0 .../delayedqueue/scala/JdbcDriverSpec.scala | 0 .../delayedqueue/scala/RetryConfigSpec.scala | 0 .../scala/ScheduledMessageSpec.scala | 0 project/plugins.sbt | 4 - 28 files changed, 285 insertions(+), 276 deletions(-) delete mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala (95%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala (87%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala (87%) create mode 100644 delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala (100%) rename delayedqueue-scala/{jvm => }/src/main/scala/org/funfix/delayedqueue/scala/hello.scala (100%) rename delayedqueue-scala/{jvm => }/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala (100%) rename delayedqueue-scala/{jvm => }/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala (100%) rename delayedqueue-scala/{jvm => }/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala (100%) rename delayedqueue-scala/{jvm => }/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala (100%) rename delayedqueue-scala/{jvm => }/src/test/scala/org/funfix/delayedqueue/scala/HelloSuite.scala (100%) rename delayedqueue-scala/{jvm => }/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala (100%) rename delayedqueue-scala/{jvm => }/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala (100%) rename delayedqueue-scala/{jvm => }/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala (100%) diff --git a/build.sbt b/build.sbt index 7f517ab..76ddf03 100644 --- a/build.sbt +++ b/build.sbt @@ -64,13 +64,10 @@ lazy val root = project ) .aggregate(delayedqueueJVM) -lazy val delayedqueue = crossProject(JVMPlatform) - .crossType(CrossType.Full) +lazy val delayedqueueJVM = project .in(file("delayedqueue-scala")) .settings( - name := "delayedqueue-scala" - ) - .jvmSettings( + name := "delayedqueue-scala", libraryDependencies ++= Seq( "org.funfix" % "delayedqueue-jvm" % version.value, "org.typelevel" %% "cats-effect" % "3.6.3", @@ -81,5 +78,3 @@ lazy val delayedqueue = crossProject(JVMPlatform) "org.scalameta" %% "munit-scalacheck" % "1.2.0" % Test, ) ) - -lazy val delayedqueueJVM = delayedqueue.jvm diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala deleted file mode 100644 index d27bdb1..0000000 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Copyright 2026 Alexandru Nedelcu - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.funfix.delayedqueue.scala - -import cats.effect.IO -import cats.syntax.functor.* -import java.time.{Clock as JavaClock, Instant} -import org.funfix.delayedqueue.jvm -import org.funfix.delayedqueue.scala.AckEnvelope.asScala -import org.funfix.delayedqueue.scala.OfferOutcome.asScala -import org.funfix.delayedqueue.scala.BatchedReply.asScala -import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala -import scala.jdk.CollectionConverters.* - -/** In-memory implementation of [[DelayedQueue]] using concurrent data structures. - * - * This implementation wraps the JVM [[org.funfix.delayedqueue.jvm.DelayedQueueInMemory]] and - * provides an idiomatic Scala API with Cats Effect IO for managing side effects. - * - * The underlying implementation uses a ReentrantLock to protect mutable state (compatible with - * virtual threads) and condition variables for efficient blocking in `poll`. - * - * ==Example== - * - * {{{ - * import cats.effect.IO - * import java.time.Instant - * - * val queue = DelayedQueueInMemory.create[String]() - * - * for { - * _ <- queue.offerOrUpdate("key1", "Hello", Instant.now().plusSeconds(10)) - * envelope <- queue.poll - * _ <- IO.println(s"Received: \${envelope.payload}") - * _ <- envelope.acknowledge - * } yield () - * }}} - * - * @tparam A - * the type of message payloads - */ -object DelayedQueueInMemory { - - /** Creates an in-memory delayed queue with default configuration. - * - * @tparam A - * the type of message payloads - * @param timeConfig - * time configuration (defaults to [[DelayedQueueTimeConfig.DEFAULT_IN_MEMORY]]) - * @param ackEnvSource - * source identifier for envelopes (defaults to "delayed-queue-inmemory") - * @param clock - * clock for time operations (defaults to system UTC clock) - * @return - * a new DelayedQueue instance - */ - def create[A]( - timeConfig: DelayedQueueTimeConfig = DelayedQueueTimeConfig.DEFAULT_IN_MEMORY, - ackEnvSource: String = "delayed-queue-inmemory", - clock: JavaClock = JavaClock.systemUTC() - ): DelayedQueue[A] = { - val jvmQueue = jvm.DelayedQueueInMemory.create[A]( - timeConfig.asJava, - ackEnvSource, - clock - ) - new DelayedQueueInMemoryWrapper(jvmQueue) - } - - /** Wrapper that implements the Scala DelayedQueue trait by delegating to the JVM implementation. - */ - private class DelayedQueueInMemoryWrapper[A](underlying: jvm.DelayedQueueInMemory[A]) - extends DelayedQueue[A] { - - override def getTimeConfig: IO[DelayedQueueTimeConfig] = - IO(underlying.getTimeConfig.asScala) - - override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = - IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) - - override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = - IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) - - override def offerBatch[In]( - messages: List[BatchedMessage[In, A]] - ): IO[List[BatchedReply[In, A]]] = - IO { - val javaMessages = messages.map(_.asJava).asJava - val javaReplies = underlying.offerBatch(javaMessages) - javaReplies.asScala.toList.map(_.asScala) - } - - override def tryPoll: IO[Option[AckEnvelope[A]]] = - IO { - Option(underlying.tryPoll()).map(_.asScala) - } - - override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = - IO { - val javaEnvelope = underlying.tryPollMany(batchMaxSize) - AckEnvelope( - payload = javaEnvelope.payload.asScala.toList, - messageId = MessageId.asScala(javaEnvelope.messageId), - timestamp = javaEnvelope.timestamp, - source = javaEnvelope.source, - deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), - acknowledge = IO.blocking(javaEnvelope.acknowledge()) - ) - } - - override def poll: IO[AckEnvelope[A]] = - IO.interruptible { - underlying.poll().asScala - } - - override def read(key: String): IO[Option[AckEnvelope[A]]] = - IO { - Option(underlying.read(key)).map(_.asScala) - } - - override def dropMessage(key: String): IO[Boolean] = - IO(underlying.dropMessage(key)) - - override def containsMessage(key: String): IO[Boolean] = - IO(underlying.containsMessage(key)) - - override def dropAllMessages(confirm: String): IO[Int] = - IO(underlying.dropAllMessages(confirm)) - - override def getCron: IO[CronService[A]] = - IO(new CronServiceWrapper(underlying.getCron)) - } - - /** Wrapper for CronService that delegates to the JVM implementation. */ - private class CronServiceWrapper[A](underlying: jvm.CronService[A]) extends CronService[A] { - - override def installTick( - configHash: CronConfigHash, - keyPrefix: String, - messages: List[CronMessage[A]] - ): IO[Unit] = - IO { - val javaMessages = messages.map(_.asJava).asJava - underlying.installTick(configHash.asJava, keyPrefix, javaMessages) - } - - override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = - IO { - underlying.uninstallTick(configHash.asJava, keyPrefix) - } - - override def install( - configHash: CronConfigHash, - keyPrefix: String, - scheduleInterval: java.time.Duration, - generateMany: CronMessageBatchGenerator[A] - ): cats.effect.Resource[IO, Unit] = { - import cats.effect.Resource - - val acquire = IO { - val javaGenerator = new jvm.CronMessageBatchGenerator[A] { - override def invoke(now: Instant): java.util.List[jvm.CronMessage[A]] = - generateMany(now).map(_.asJava).asJava - } - underlying.install(configHash.asJava, keyPrefix, scheduleInterval, javaGenerator) - } - - val release = (closeable: AutoCloseable) => IO(closeable.close()) - - Resource.make(acquire)(release).void - } - - override def installDailySchedule( - keyPrefix: String, - schedule: CronDailySchedule, - generator: CronMessageGenerator[A] - ): cats.effect.Resource[IO, Unit] = { - import cats.effect.Resource - - val acquire = IO { - val javaGenerator = new jvm.CronMessageGenerator[A] { - override def invoke(at: Instant): jvm.CronMessage[A] = - generator(at).asJava - } - underlying.installDailySchedule(keyPrefix, schedule.asJava, javaGenerator) - } - - val release = (closeable: AutoCloseable) => IO(closeable.close()) - - Resource.make(acquire)(release).void - } - - override def installPeriodicTick( - keyPrefix: String, - period: java.time.Duration, - generator: CronPayloadGenerator[A] - ): cats.effect.Resource[IO, Unit] = { - import cats.effect.Resource - - val acquire = IO { - val javaGenerator = new jvm.CronPayloadGenerator[A] { - override def invoke(at: Instant): A = generator(at) - } - underlying.installPeriodicTick(keyPrefix, period, javaGenerator) - } - - val release = (closeable: AutoCloseable) => IO(closeable.close()) - - Resource.make(acquire)(release).void - } - } -} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala similarity index 95% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala index 80ebc8a..5517dfb 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala @@ -16,7 +16,7 @@ package org.funfix.delayedqueue.scala -import cats.effect.IO +import cats.effect.Sync import java.time.Instant import org.funfix.delayedqueue.jvm @@ -52,13 +52,13 @@ import org.funfix.delayedqueue.jvm * @param acknowledge * IO action to call to acknowledge successful processing, and delete the message from the queue */ -final case class AckEnvelope[+A]( +final case class AckEnvelope[+F[_], +A]( payload: A, messageId: MessageId, timestamp: Instant, source: String, deliveryType: DeliveryType, - acknowledge: IO[Unit] + acknowledge: F[Unit] ) object AckEnvelope { @@ -67,14 +67,14 @@ object AckEnvelope { extension [A](javaEnv: jvm.AckEnvelope[A]) { /** Converts a JVM AckEnvelope to a Scala AckEnvelope. */ - def asScala: AckEnvelope[A] = + def asScala[F[_]: Sync]: AckEnvelope[F, A] = AckEnvelope( payload = javaEnv.payload, messageId = MessageId.asScala(javaEnv.messageId), timestamp = javaEnv.timestamp, source = javaEnv.source, deliveryType = DeliveryType.asScala(javaEnv.deliveryType), - acknowledge = IO.blocking(javaEnv.acknowledge()) + acknowledge = Sync[F].blocking(javaEnv.acknowledge()) ) } } diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala similarity index 87% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala index 09c43f9..a3f3290 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala @@ -16,7 +16,6 @@ package org.funfix.delayedqueue.scala -import cats.effect.IO import cats.effect.Resource import java.time.Duration import java.time.Instant @@ -30,7 +29,7 @@ import java.time.Instant * @tparam A * the type of message payload */ -trait CronService[A] { +trait CronService[F[_], A] { /** Installs a one-time set of future scheduled messages. * @@ -51,7 +50,7 @@ trait CronService[A] { configHash: CronConfigHash, keyPrefix: String, messages: List[CronMessage[A]] - ): IO[Unit] + ): F[Unit] /** Uninstalls all future messages for a specific cron configuration. * @@ -62,7 +61,7 @@ trait CronService[A] { * @param keyPrefix * prefix for message keys to remove */ - def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] + def uninstallTick(configHash: CronConfigHash, keyPrefix: String): F[Unit] /** Installs a cron-like schedule where messages are generated at intervals. * @@ -87,8 +86,8 @@ trait CronService[A] { configHash: CronConfigHash, keyPrefix: String, scheduleInterval: Duration, - generateMany: CronMessageBatchGenerator[A] - ): Resource[IO, Unit] + generateMany: (Instant) => List[CronMessage[A]] + ): Resource[F, Unit] /** Installs a daily schedule with timezone-aware execution times. * @@ -107,8 +106,8 @@ trait CronService[A] { def installDailySchedule( keyPrefix: String, schedule: CronDailySchedule, - generator: CronMessageGenerator[A] - ): Resource[IO, Unit] + generator: (Instant) => CronMessage[A] + ): Resource[F, Unit] /** Installs a periodic tick that generates messages at fixed intervals. * @@ -127,20 +126,20 @@ trait CronService[A] { def installPeriodicTick( keyPrefix: String, period: Duration, - generator: CronPayloadGenerator[A] - ): Resource[IO, Unit] + generator: (Instant) => A + ): Resource[F, Unit] } -/** Generates a batch of cron messages based on the current instant. */ -trait CronMessageBatchGenerator[A] { +// /** Generates a batch of cron messages based on the current instant. */ +// trait CronMessageBatchGenerator[A] { - /** Creates a batch of cron messages. */ - def apply(now: Instant): List[CronMessage[A]] -} +// /** Creates a batch of cron messages. */ +// def apply(now: Instant): List[CronMessage[A]] +// } -/** Generates a payload for a given instant. */ -trait CronPayloadGenerator[A] { +// /** Generates a payload for a given instant. */ +// trait CronPayloadGenerator[A] { - /** Creates a payload for the given instant. */ - def apply(at: Instant): A -} +// /** Creates a payload for the given instant. */ +// def apply(at: Instant): A +// } diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala similarity index 87% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala index 05ee1de..8803814 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala @@ -16,7 +16,6 @@ package org.funfix.delayedqueue.scala -import cats.effect.IO import java.time.Instant /** A delayed queue for scheduled message processing with FIFO semantics. @@ -24,10 +23,10 @@ import java.time.Instant * @tparam A * the type of message payloads stored in the queue */ -trait DelayedQueue[A] { +trait DelayedQueue[F[_], A] { /** Returns the [DelayedQueueTimeConfig] with which this instance was initialized. */ - def getTimeConfig: IO[DelayedQueueTimeConfig] + def getTimeConfig: F[DelayedQueueTimeConfig] /** Offers a message for processing, at a specific timestamp. * @@ -41,10 +40,10 @@ trait DelayedQueue[A] { * @param scheduleAt * specifies when the message will become available for `poll` and processing */ - def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] + def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): F[OfferOutcome] /** Version of [offerOrUpdate] that only creates new entries and does not allow updates. */ - def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] + def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): F[OfferOutcome] /** Batched version of offer operations. * @@ -52,7 +51,7 @@ trait DelayedQueue[A] { * is the type of the input message, corresponding to each [ScheduledMessage]. This helps in * streaming the original input messages after processing the batch. */ - def offerBatch[In](messages: List[BatchedMessage[In, A]]): IO[List[BatchedReply[In, A]]] + def offerBatch[In](messages: List[BatchedMessage[In, A]]): F[List[BatchedReply[In, A]]] /** Pulls the first message to process from the queue (FIFO), returning `None` in case no such * message is available. @@ -60,7 +59,7 @@ trait DelayedQueue[A] { * This method locks the message for processing, making it invisible for other consumers (until * the configured timeout happens). */ - def tryPoll: IO[Option[AckEnvelope[A]]] + def tryPoll: F[Option[AckEnvelope[F, A]]] /** Pulls a batch of messages to process from the queue (FIFO), returning an empty list in case no * such messages are available. @@ -73,12 +72,12 @@ trait DelayedQueue[A] { * of returned messages can be smaller than this value, depending on how many messages are * available at the time of polling */ - def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] + def tryPollMany(batchMaxSize: Int): F[AckEnvelope[F, List[A]]] /** Extracts the next event from the delayed-queue, or waits until there's such an event * available. */ - def poll: IO[AckEnvelope[A]] + def poll: F[AckEnvelope[F, A]] /** Reads a message from the queue, corresponding to the given `key`, without locking it for * processing. @@ -90,10 +89,10 @@ trait DelayedQueue[A] { * WARNING: this operation invalidates the model of the queue. DO NOT USE! This is because * multiple consumers can process the same message, leading to potential issues. */ - def read(key: String): IO[Option[AckEnvelope[A]]] + def read(key: String): F[Option[AckEnvelope[F, A]]] /** Deletes a message from the queue that's associated with the given `key`. */ - def dropMessage(key: String): IO[Boolean] + def dropMessage(key: String): F[Boolean] /** Checks that a message exists in the queue. * @@ -102,7 +101,7 @@ trait DelayedQueue[A] { * @return * `true` in case a message with the given `key` exists in the queue, `false` otherwise */ - def containsMessage(key: String): IO[Boolean] + def containsMessage(key: String): F[Boolean] /** Drops all existing enqueued messages. * @@ -116,8 +115,8 @@ trait DelayedQueue[A] { * @return * the number of messages deleted */ - def dropAllMessages(confirm: String): IO[Int] + def dropAllMessages(confirm: String): F[Int] /** Utilities for installing cron-like schedules. */ - def getCron: IO[CronService[A]] + def cron: F[CronService[F, A]] } diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala new file mode 100644 index 0000000..52126d2 --- /dev/null +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala @@ -0,0 +1,246 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.effect.{Sync, Async, Clock, Resource} +import cats.syntax.functor.* +import java.time.{Clock as JavaClock, Instant} +import org.funfix.delayedqueue.jvm +import org.funfix.delayedqueue.scala.AckEnvelope.asScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala +import org.funfix.delayedqueue.scala.BatchedReply.asScala +import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala +import scala.jdk.CollectionConverters.* +import cats.effect.std.Dispatcher + +/** In-memory implementation of [[DelayedQueue]] using concurrent data structures. + * + * This implementation wraps the JVM [[org.funfix.delayedqueue.jvm.DelayedQueueInMemory]] and + * provides an idiomatic Scala API with Cats Effect IO for managing side effects. + * + * ==Example== + * + * {{{ + * import cats.effect.IO + * import java.time.Instant + * + * def worker(queue: DelayedQueue[IO, String]): IO[Unit] = { + * val process1 = for { + * envelope <- queue.poll + * _ <- logger.info("Received: " + envelope.payload) + * _ <- envelope.acknowledge + * } yield () + * + * process1.attempt + * .onErrorHandleWith { error => + * logger.error("Error processing message, will reprocess after timeout", error) + * } + * .flatMap { _ => + * worker(queue) // Continue processing the next message + * } + * } + * + * DelayedQueueInMemory[IO, String]().use { queue => + * worker(queue).background.use { _ => + * // Push one message after 10 seconds + * queue.offerOrUpdate("key1", "Hello", Instant.now().plusSeconds(10)) + * } + * } + * }}} + * + * @tparam A + * the type of message payloads + */ +object DelayedQueueInMemory { + + /** Creates an in-memory delayed queue with default configuration. + * + * @tparam A + * the type of message payloads + * @param timeConfig + * time configuration (defaults to [[DelayedQueueTimeConfig.DEFAULT_IN_MEMORY]]) + * @param ackEnvSource + * source identifier for envelopes (defaults to "delayed-queue-inmemory") + * @param clock + * clock for time operations (defaults to system UTC clock) + * @return + * a new DelayedQueue instance + */ + def apply[F[_], A]( + timeConfig: DelayedQueueTimeConfig = DelayedQueueTimeConfig.DEFAULT_IN_MEMORY, + ackEnvSource: String = "delayed-queue-inmemory" + )(using Async[F], Clock[F]): Resource[F, DelayedQueue[F, A]] = + Dispatcher.sequential[F].evalMap { dispatcher => + Sync[F].delay { + val javaClock = CatsClockToJavaClock[F](dispatcher) + val jvmQueue = jvm.DelayedQueueInMemory.create[A]( + timeConfig.asJava, + ackEnvSource, + javaClock + ) + new DelayedQueueInMemoryWrapper(jvmQueue) + } + } + + /** Wrapper that implements the Scala DelayedQueue trait by delegating to the JVM implementation. + */ + private class DelayedQueueInMemoryWrapper[F[_], A]( + underlying: jvm.DelayedQueueInMemory[A] + )(using Sync[F]) extends DelayedQueue[F, A] { + + override def getTimeConfig: F[DelayedQueueTimeConfig] = + Sync[F].delay(underlying.getTimeConfig.asScala) + + override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): F[OfferOutcome] = + Sync[F].delay(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) + + override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): F[OfferOutcome] = + Sync[F].delay(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) + + override def offerBatch[In]( + messages: List[BatchedMessage[In, A]] + ): F[List[BatchedReply[In, A]]] = + Sync[F].delay { + val javaMessages = messages.map(_.asJava).asJava + val javaReplies = underlying.offerBatch(javaMessages) + javaReplies.asScala.toList.map(_.asScala) + } + + override def tryPoll: F[Option[AckEnvelope[F, A]]] = + Sync[F].delay { + Option(underlying.tryPoll()).map(_.asScala) + } + + override def tryPollMany(batchMaxSize: Int): F[AckEnvelope[F, List[A]]] = + Sync[F].delay { + val javaEnvelope = underlying.tryPollMany(batchMaxSize) + AckEnvelope( + payload = javaEnvelope.payload.asScala.toList, + messageId = MessageId.asScala(javaEnvelope.messageId), + timestamp = javaEnvelope.timestamp, + source = javaEnvelope.source, + deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), + acknowledge = Sync[F].blocking(javaEnvelope.acknowledge()) + ) + } + + override def poll: F[AckEnvelope[F, A]] = + Sync[F].interruptible { + underlying.poll().asScala + } + + override def read(key: String): F[Option[AckEnvelope[F, A]]] = + Sync[F].delay { + Option(underlying.read(key)).map(_.asScala) + } + + override def dropMessage(key: String): F[Boolean] = + Sync[F].delay(underlying.dropMessage(key)) + + override def containsMessage(key: String): F[Boolean] = + Sync[F].delay(underlying.containsMessage(key)) + + override def dropAllMessages(confirm: String): F[Int] = + Sync[F].delay(underlying.dropAllMessages(confirm)) + + override def cron: F[CronService[F, A]] = + Sync[F].delay(new CronServiceWrapper(underlying.getCron)) + } + + /** Wrapper for CronService that delegates to the JVM implementation. */ + private class CronServiceWrapper[F[_], A]( + underlying: jvm.CronService[A] + )(using Sync[F]) extends CronService[F, A] { + + override def installTick( + configHash: CronConfigHash, + keyPrefix: String, + messages: List[CronMessage[A]] + ): F[Unit] = + Sync[F].delay { + val javaMessages = messages.map(_.asJava).asJava + underlying.installTick(configHash.asJava, keyPrefix, javaMessages) + } + + override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): F[Unit] = + Sync[F].delay { + underlying.uninstallTick(configHash.asJava, keyPrefix) + } + + override def install( + configHash: CronConfigHash, + keyPrefix: String, + scheduleInterval: java.time.Duration, + generateMany: (Instant) => List[CronMessage[A]] + ): Resource[F, Unit] = { + import cats.effect.Resource + + Resource.fromAutoCloseable(Sync[F].delay { + underlying.install( + configHash.asJava, + keyPrefix, + scheduleInterval, + now => generateMany(now).map(_.asJava).asJava + ) + }).void + } + + override def installDailySchedule( + keyPrefix: String, + schedule: CronDailySchedule, + generator: (Instant) => CronMessage[A] + ): Resource[F, Unit] = + Resource.fromAutoCloseable(Sync[F].delay { + underlying.installDailySchedule( + keyPrefix, + schedule.asJava, + now => generator(now).asJava + ) + }).void + + override def installPeriodicTick( + keyPrefix: String, + period: java.time.Duration, + generator: (Instant) => A + ): cats.effect.Resource[F, Unit] = + Resource.fromAutoCloseable(Sync[F].delay { + underlying.installPeriodicTick( + keyPrefix, + period, + now => generator(now) + ) + }).void + } +} + +private final class CatsClockToJavaClock[+F[_]: Sync: Clock]( + dispatcher: Dispatcher[F], + zone: java.time.ZoneId = JavaClock.systemUTC().getZone +) extends JavaClock { + override def getZone: java.time.ZoneId = + zone + + override def withZone(zone: java.time.ZoneId): JavaClock = + new CatsClockToJavaClock(dispatcher, zone) + + override def instant(): Instant = + dispatcher.unsafeRunSync( + Clock[F] + .realTime + .map(it => Instant.ofEpochMilli(it.toMillis).nn) + ) +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/hello.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/hello.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/hello.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/hello.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/HelloSuite.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/HelloSuite.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/HelloSuite.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/HelloSuite.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala diff --git a/project/plugins.sbt b/project/plugins.sbt index da89c90..e612a01 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,3 @@ -addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.3.2") -addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.3.2") -addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.20.2") -addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.5.10") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.6") addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.2") From d423bd97ce60cc7f22cd722a88f2d8bf55f08911 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 9 Feb 2026 14:09:21 +0200 Subject: [PATCH 4/7] Fix API again --- .../delayedqueue/scala/AckEnvelope.scala | 10 +- .../delayedqueue/scala/CronService.scala | 22 ++-- .../delayedqueue/scala/DelayedQueue.scala | 27 ++--- .../scala/DelayedQueueInMemory.scala | 107 ++++++++---------- 4 files changed, 80 insertions(+), 86 deletions(-) diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala index 5517dfb..80ebc8a 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala @@ -16,7 +16,7 @@ package org.funfix.delayedqueue.scala -import cats.effect.Sync +import cats.effect.IO import java.time.Instant import org.funfix.delayedqueue.jvm @@ -52,13 +52,13 @@ import org.funfix.delayedqueue.jvm * @param acknowledge * IO action to call to acknowledge successful processing, and delete the message from the queue */ -final case class AckEnvelope[+F[_], +A]( +final case class AckEnvelope[+A]( payload: A, messageId: MessageId, timestamp: Instant, source: String, deliveryType: DeliveryType, - acknowledge: F[Unit] + acknowledge: IO[Unit] ) object AckEnvelope { @@ -67,14 +67,14 @@ object AckEnvelope { extension [A](javaEnv: jvm.AckEnvelope[A]) { /** Converts a JVM AckEnvelope to a Scala AckEnvelope. */ - def asScala[F[_]: Sync]: AckEnvelope[F, A] = + def asScala: AckEnvelope[A] = AckEnvelope( payload = javaEnv.payload, messageId = MessageId.asScala(javaEnv.messageId), timestamp = javaEnv.timestamp, source = javaEnv.source, deliveryType = DeliveryType.asScala(javaEnv.deliveryType), - acknowledge = Sync[F].blocking(javaEnv.acknowledge()) + acknowledge = IO.blocking(javaEnv.acknowledge()) ) } } diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala index a3f3290..d09dd51 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala @@ -16,7 +16,7 @@ package org.funfix.delayedqueue.scala -import cats.effect.Resource +import cats.effect.{Resource, IO} import java.time.Duration import java.time.Instant @@ -29,7 +29,7 @@ import java.time.Instant * @tparam A * the type of message payload */ -trait CronService[F[_], A] { +trait CronService[A] { /** Installs a one-time set of future scheduled messages. * @@ -46,11 +46,11 @@ trait CronService[F[_], A] { * @param messages * list of messages to schedule */ - def installTick( + def installTick( configHash: CronConfigHash, keyPrefix: String, messages: List[CronMessage[A]] - ): F[Unit] + ): IO[Unit] /** Uninstalls all future messages for a specific cron configuration. * @@ -61,7 +61,7 @@ trait CronService[F[_], A] { * @param keyPrefix * prefix for message keys to remove */ - def uninstallTick(configHash: CronConfigHash, keyPrefix: String): F[Unit] + def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] /** Installs a cron-like schedule where messages are generated at intervals. * @@ -82,12 +82,12 @@ trait CronService[F[_], A] { * @return * a Resource that manages the lifecycle of the background scheduling process */ - def install( + def install( configHash: CronConfigHash, keyPrefix: String, scheduleInterval: Duration, generateMany: (Instant) => List[CronMessage[A]] - ): Resource[F, Unit] + ): Resource[IO, Unit] /** Installs a daily schedule with timezone-aware execution times. * @@ -103,11 +103,11 @@ trait CronService[F[_], A] { * @return * a Resource that manages the lifecycle of the background scheduling process */ - def installDailySchedule( + def installDailySchedule( keyPrefix: String, schedule: CronDailySchedule, generator: (Instant) => CronMessage[A] - ): Resource[F, Unit] + ): Resource[IO, Unit] /** Installs a periodic tick that generates messages at fixed intervals. * @@ -123,11 +123,11 @@ trait CronService[F[_], A] { * @return * a Resource that manages the lifecycle of the background scheduling process */ - def installPeriodicTick( + def installPeriodicTick( keyPrefix: String, period: Duration, generator: (Instant) => A - ): Resource[F, Unit] + ): Resource[IO, Unit] } // /** Generates a batch of cron messages based on the current instant. */ diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala index 8803814..1257bac 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala @@ -16,6 +16,7 @@ package org.funfix.delayedqueue.scala +import cats.effect.IO import java.time.Instant /** A delayed queue for scheduled message processing with FIFO semantics. @@ -23,10 +24,10 @@ import java.time.Instant * @tparam A * the type of message payloads stored in the queue */ -trait DelayedQueue[F[_], A] { +trait DelayedQueue[A] { /** Returns the [DelayedQueueTimeConfig] with which this instance was initialized. */ - def getTimeConfig: F[DelayedQueueTimeConfig] + def getTimeConfig: IO[DelayedQueueTimeConfig] /** Offers a message for processing, at a specific timestamp. * @@ -40,10 +41,10 @@ trait DelayedQueue[F[_], A] { * @param scheduleAt * specifies when the message will become available for `poll` and processing */ - def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): F[OfferOutcome] + def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] /** Version of [offerOrUpdate] that only creates new entries and does not allow updates. */ - def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): F[OfferOutcome] + def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] /** Batched version of offer operations. * @@ -51,7 +52,7 @@ trait DelayedQueue[F[_], A] { * is the type of the input message, corresponding to each [ScheduledMessage]. This helps in * streaming the original input messages after processing the batch. */ - def offerBatch[In](messages: List[BatchedMessage[In, A]]): F[List[BatchedReply[In, A]]] + def offerBatch[In](messages: List[BatchedMessage[In, A]]): IO[List[BatchedReply[In, A]]] /** Pulls the first message to process from the queue (FIFO), returning `None` in case no such * message is available. @@ -59,7 +60,7 @@ trait DelayedQueue[F[_], A] { * This method locks the message for processing, making it invisible for other consumers (until * the configured timeout happens). */ - def tryPoll: F[Option[AckEnvelope[F, A]]] + def tryPoll: IO[Option[AckEnvelope[A]]] /** Pulls a batch of messages to process from the queue (FIFO), returning an empty list in case no * such messages are available. @@ -72,12 +73,12 @@ trait DelayedQueue[F[_], A] { * of returned messages can be smaller than this value, depending on how many messages are * available at the time of polling */ - def tryPollMany(batchMaxSize: Int): F[AckEnvelope[F, List[A]]] + def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] /** Extracts the next event from the delayed-queue, or waits until there's such an event * available. */ - def poll: F[AckEnvelope[F, A]] + def poll: IO[AckEnvelope[A]] /** Reads a message from the queue, corresponding to the given `key`, without locking it for * processing. @@ -89,10 +90,10 @@ trait DelayedQueue[F[_], A] { * WARNING: this operation invalidates the model of the queue. DO NOT USE! This is because * multiple consumers can process the same message, leading to potential issues. */ - def read(key: String): F[Option[AckEnvelope[F, A]]] + def read(key: String): IO[Option[AckEnvelope[A]]] /** Deletes a message from the queue that's associated with the given `key`. */ - def dropMessage(key: String): F[Boolean] + def dropMessage(key: String): IO[Boolean] /** Checks that a message exists in the queue. * @@ -101,7 +102,7 @@ trait DelayedQueue[F[_], A] { * @return * `true` in case a message with the given `key` exists in the queue, `false` otherwise */ - def containsMessage(key: String): F[Boolean] + def containsMessage(key: String): IO[Boolean] /** Drops all existing enqueued messages. * @@ -115,8 +116,8 @@ trait DelayedQueue[F[_], A] { * @return * the number of messages deleted */ - def dropAllMessages(confirm: String): F[Int] + def dropAllMessages(confirm: String): IO[Int] /** Utilities for installing cron-like schedules. */ - def cron: F[CronService[F, A]] + def cron: IO[CronService[A]] } diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala index 52126d2..87f46c3 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala @@ -16,7 +16,7 @@ package org.funfix.delayedqueue.scala -import cats.effect.{Sync, Async, Clock, Resource} +import cats.effect.{IO, Resource, Clock} import cats.syntax.functor.* import java.time.{Clock as JavaClock, Instant} import org.funfix.delayedqueue.jvm @@ -80,13 +80,13 @@ object DelayedQueueInMemory { * @return * a new DelayedQueue instance */ - def apply[F[_], A]( + def apply[A]( timeConfig: DelayedQueueTimeConfig = DelayedQueueTimeConfig.DEFAULT_IN_MEMORY, ackEnvSource: String = "delayed-queue-inmemory" - )(using Async[F], Clock[F]): Resource[F, DelayedQueue[F, A]] = - Dispatcher.sequential[F].evalMap { dispatcher => - Sync[F].delay { - val javaClock = CatsClockToJavaClock[F](dispatcher) + ): Resource[IO, DelayedQueue[A]] = + Dispatcher.sequential[IO].evalMap { dispatcher => + IO { + val javaClock = CatsClockToJavaClock(dispatcher) val jvmQueue = jvm.DelayedQueueInMemory.create[A]( timeConfig.asJava, ackEnvSource, @@ -98,35 +98,35 @@ object DelayedQueueInMemory { /** Wrapper that implements the Scala DelayedQueue trait by delegating to the JVM implementation. */ - private class DelayedQueueInMemoryWrapper[F[_], A]( + private class DelayedQueueInMemoryWrapper[A]( underlying: jvm.DelayedQueueInMemory[A] - )(using Sync[F]) extends DelayedQueue[F, A] { + ) extends DelayedQueue[A] { - override def getTimeConfig: F[DelayedQueueTimeConfig] = - Sync[F].delay(underlying.getTimeConfig.asScala) + override def getTimeConfig: IO[DelayedQueueTimeConfig] = + IO(underlying.getTimeConfig.asScala) - override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): F[OfferOutcome] = - Sync[F].delay(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) + override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) - override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): F[OfferOutcome] = - Sync[F].delay(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) + override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) override def offerBatch[In]( messages: List[BatchedMessage[In, A]] - ): F[List[BatchedReply[In, A]]] = - Sync[F].delay { + ): IO[List[BatchedReply[In, A]]] = + IO { val javaMessages = messages.map(_.asJava).asJava val javaReplies = underlying.offerBatch(javaMessages) javaReplies.asScala.toList.map(_.asScala) } - override def tryPoll: F[Option[AckEnvelope[F, A]]] = - Sync[F].delay { + override def tryPoll: IO[Option[AckEnvelope[A]]] = + IO { Option(underlying.tryPoll()).map(_.asScala) } - override def tryPollMany(batchMaxSize: Int): F[AckEnvelope[F, List[A]]] = - Sync[F].delay { + override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = + IO { val javaEnvelope = underlying.tryPollMany(batchMaxSize) AckEnvelope( payload = javaEnvelope.payload.asScala.toList, @@ -134,50 +134,48 @@ object DelayedQueueInMemory { timestamp = javaEnvelope.timestamp, source = javaEnvelope.source, deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), - acknowledge = Sync[F].blocking(javaEnvelope.acknowledge()) + acknowledge = IO.blocking(javaEnvelope.acknowledge()) ) } - override def poll: F[AckEnvelope[F, A]] = - Sync[F].interruptible { - underlying.poll().asScala - } + override def poll: IO[AckEnvelope[A]] = + IO.interruptible(underlying.poll().asScala) - override def read(key: String): F[Option[AckEnvelope[F, A]]] = - Sync[F].delay { + override def read(key: String): IO[Option[AckEnvelope[A]]] = + IO { Option(underlying.read(key)).map(_.asScala) } - override def dropMessage(key: String): F[Boolean] = - Sync[F].delay(underlying.dropMessage(key)) + override def dropMessage(key: String): IO[Boolean] = + IO(underlying.dropMessage(key)) - override def containsMessage(key: String): F[Boolean] = - Sync[F].delay(underlying.containsMessage(key)) + override def containsMessage(key: String): IO[Boolean] = + IO(underlying.containsMessage(key)) - override def dropAllMessages(confirm: String): F[Int] = - Sync[F].delay(underlying.dropAllMessages(confirm)) + override def dropAllMessages(confirm: String): IO[Int] = + IO(underlying.dropAllMessages(confirm)) - override def cron: F[CronService[F, A]] = - Sync[F].delay(new CronServiceWrapper(underlying.getCron)) + override def cron: IO[CronService[A]] = + IO(new CronServiceWrapper(underlying.getCron)) } /** Wrapper for CronService that delegates to the JVM implementation. */ - private class CronServiceWrapper[F[_], A]( + private class CronServiceWrapper[A]( underlying: jvm.CronService[A] - )(using Sync[F]) extends CronService[F, A] { + ) extends CronService[A] { override def installTick( configHash: CronConfigHash, keyPrefix: String, messages: List[CronMessage[A]] - ): F[Unit] = - Sync[F].delay { + ): IO[Unit] = + IO { val javaMessages = messages.map(_.asJava).asJava underlying.installTick(configHash.asJava, keyPrefix, javaMessages) } - override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): F[Unit] = - Sync[F].delay { + override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = + IO { underlying.uninstallTick(configHash.asJava, keyPrefix) } @@ -186,10 +184,8 @@ object DelayedQueueInMemory { keyPrefix: String, scheduleInterval: java.time.Duration, generateMany: (Instant) => List[CronMessage[A]] - ): Resource[F, Unit] = { - import cats.effect.Resource - - Resource.fromAutoCloseable(Sync[F].delay { + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { underlying.install( configHash.asJava, keyPrefix, @@ -197,14 +193,13 @@ object DelayedQueueInMemory { now => generateMany(now).map(_.asJava).asJava ) }).void - } override def installDailySchedule( keyPrefix: String, schedule: CronDailySchedule, generator: (Instant) => CronMessage[A] - ): Resource[F, Unit] = - Resource.fromAutoCloseable(Sync[F].delay { + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { underlying.installDailySchedule( keyPrefix, schedule.asJava, @@ -216,8 +211,8 @@ object DelayedQueueInMemory { keyPrefix: String, period: java.time.Duration, generator: (Instant) => A - ): cats.effect.Resource[F, Unit] = - Resource.fromAutoCloseable(Sync[F].delay { + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { underlying.installPeriodicTick( keyPrefix, period, @@ -227,10 +222,10 @@ object DelayedQueueInMemory { } } -private final class CatsClockToJavaClock[+F[_]: Sync: Clock]( - dispatcher: Dispatcher[F], - zone: java.time.ZoneId = JavaClock.systemUTC().getZone -) extends JavaClock { +private final class CatsClockToJavaClock( + dispatcher: Dispatcher[IO], + zone: java.time.ZoneId = java.time.ZoneId.systemDefault() +)(using Clock[IO]) extends JavaClock { override def getZone: java.time.ZoneId = zone @@ -239,8 +234,6 @@ private final class CatsClockToJavaClock[+F[_]: Sync: Clock]( override def instant(): Instant = dispatcher.unsafeRunSync( - Clock[F] - .realTime - .map(it => Instant.ofEpochMilli(it.toMillis).nn) + Clock[IO].realTime.map(it => Instant.ofEpochMilli(it.toMillis)) ) } From 1b2a641cd246f8ef8d8dcc0e26fe5717b0b4e1e6 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 9 Feb 2026 15:21:21 +0200 Subject: [PATCH 5/7] Reformatting --- build.sbt | 1 + .../delayedqueue/scala/CronService.scala | 16 +- .../scala/DelayedQueueInMemorySpec.scala | 346 +++++++++++------- 3 files changed, 213 insertions(+), 150 deletions(-) diff --git a/build.sbt b/build.sbt index 76ddf03..40590b3 100644 --- a/build.sbt +++ b/build.sbt @@ -74,6 +74,7 @@ lazy val delayedqueueJVM = project // Testing "org.scalameta" %% "munit" % "1.0.4" % Test, "org.typelevel" %% "munit-cats-effect" % "2.1.0" % Test, + "org.typelevel" %% "cats-effect-testkit" % "3.6.3" % Test, "org.scalacheck" %% "scalacheck" % "1.19.0" % Test, "org.scalameta" %% "munit-scalacheck" % "1.2.0" % Test, ) diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala index d09dd51..45fc7bd 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala @@ -46,11 +46,11 @@ trait CronService[A] { * @param messages * list of messages to schedule */ - def installTick( + def installTick( configHash: CronConfigHash, keyPrefix: String, messages: List[CronMessage[A]] - ): IO[Unit] + ): IO[Unit] /** Uninstalls all future messages for a specific cron configuration. * @@ -82,12 +82,12 @@ trait CronService[A] { * @return * a Resource that manages the lifecycle of the background scheduling process */ - def install( + def install( configHash: CronConfigHash, keyPrefix: String, scheduleInterval: Duration, generateMany: (Instant) => List[CronMessage[A]] - ): Resource[IO, Unit] + ): Resource[IO, Unit] /** Installs a daily schedule with timezone-aware execution times. * @@ -103,11 +103,11 @@ trait CronService[A] { * @return * a Resource that manages the lifecycle of the background scheduling process */ - def installDailySchedule( + def installDailySchedule( keyPrefix: String, schedule: CronDailySchedule, generator: (Instant) => CronMessage[A] - ): Resource[IO, Unit] + ): Resource[IO, Unit] /** Installs a periodic tick that generates messages at fixed intervals. * @@ -123,11 +123,11 @@ trait CronService[A] { * @return * a Resource that manages the lifecycle of the background scheduling process */ - def installPeriodicTick( + def installPeriodicTick( keyPrefix: String, period: Duration, generator: (Instant) => A - ): Resource[IO, Unit] + ): Resource[IO, Unit] } // /** Generates a batch of cron messages based on the current instant. */ diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala index f8485b5..84a4485 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala @@ -17,189 +17,219 @@ package org.funfix.delayedqueue.scala import cats.effect.IO -import cats.effect.unsafe.implicits.global import java.time.Instant +import munit.CatsEffectSuite import scala.concurrent.duration.* -class DelayedQueueInMemorySpec extends munit.FunSuite { +class DelayedQueueInMemorySpec extends CatsEffectSuite { - test("create should return a working queue") { - val queue = DelayedQueueInMemory.create[String]() - val result = queue.getTimeConfig.unsafeRunSync() - assertEquals(result, DelayedQueueTimeConfig.DEFAULT_IN_MEMORY) + test("apply should return a working queue") { + DelayedQueueInMemory[String]().use { queue => + queue.getTimeConfig.assertEquals(DelayedQueueTimeConfig.DEFAULT_IN_MEMORY) + } } test("offerOrUpdate should create a new message") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().plusSeconds(10) - val result = queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() - assertEquals(result, OfferOutcome.Created) + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + result <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + } yield assertEquals(result, OfferOutcome.Created) + } } test("offerOrUpdate should update an existing message") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().plusSeconds(10) - - queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() - val result = queue.offerOrUpdate("key1", "payload2", scheduleAt.plusSeconds(5)).unsafeRunSync() - - assertEquals(result, OfferOutcome.Updated) + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + result <- queue.offerOrUpdate("key1", "payload2", scheduleAt.plusSeconds(5)) + } yield assertEquals(result, OfferOutcome.Updated) + } } test("offerIfNotExists should create a new message") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().plusSeconds(10) - val result = queue.offerIfNotExists("key1", "payload1", scheduleAt).unsafeRunSync() - assertEquals(result, OfferOutcome.Created) + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + result <- queue.offerIfNotExists("key1", "payload1", scheduleAt) + } yield assertEquals(result, OfferOutcome.Created) + } } test("offerIfNotExists should ignore existing message") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().plusSeconds(10) - - queue.offerIfNotExists("key1", "payload1", scheduleAt).unsafeRunSync() - val result = - queue.offerIfNotExists("key1", "payload2", scheduleAt.plusSeconds(5)).unsafeRunSync() - - assertEquals(result, OfferOutcome.Ignored) + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerIfNotExists("key1", "payload1", scheduleAt) + result <- queue.offerIfNotExists("key1", "payload2", scheduleAt.plusSeconds(5)) + } yield assertEquals(result, OfferOutcome.Ignored) + } } test("tryPoll should return None when no messages are available") { - val queue = DelayedQueueInMemory.create[String]() - val result = queue.tryPoll.unsafeRunSync() - assertEquals(result, None) + DelayedQueueInMemory[String]().use { queue => + queue.tryPoll.assertEquals(None) + } } test("tryPoll should return a message when scheduled time has passed") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().minusSeconds(1) - - queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() - val envelope = queue.tryPoll.unsafeRunSync() - - assert(envelope.isDefined) - assertEquals(envelope.get.payload, "payload1") - assertEquals(envelope.get.messageId.value, "key1") + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.tryPoll + _ <- IO { + assert(envelope.isDefined) + assertEquals(envelope.get.payload, "payload1") + assertEquals(envelope.get.messageId.value, "key1") + } + } yield () + } } test("tryPollMany should return empty list when no messages are available") { - val queue = DelayedQueueInMemory.create[String]() - val envelope = queue.tryPollMany(10).unsafeRunSync() - assertEquals(envelope.payload, List.empty[String]) + DelayedQueueInMemory[String]().use { queue => + queue.tryPollMany(10).map { envelope => + assertEquals(envelope.payload, List.empty[String]) + } + } } test("tryPollMany should return multiple messages") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().minusSeconds(1) - - queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() - queue.offerOrUpdate("key2", "payload2", scheduleAt).unsafeRunSync() - queue.offerOrUpdate("key3", "payload3", scheduleAt).unsafeRunSync() - - val envelope = queue.tryPollMany(5).unsafeRunSync() - - assertEquals(envelope.payload.length, 3) - assert(envelope.payload.contains("payload1")) - assert(envelope.payload.contains("payload2")) - assert(envelope.payload.contains("payload3")) + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + _ <- queue.offerOrUpdate("key3", "payload3", scheduleAt) + envelope <- queue.tryPollMany(5) + _ <- IO { + assertEquals(envelope.payload.length, 3) + assertEquals( + envelope.payload.toSet, + Set("payload1", "payload2", "payload3"), + "tryPollMany should return all three messages" + ) + } + } yield () + } } test("offerBatch should handle multiple messages") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().plusSeconds(10) - - val messages = List( - BatchedMessage("input1", ScheduledMessage("key1", "payload1", scheduleAt, canUpdate = true)), - BatchedMessage("input2", ScheduledMessage("key2", "payload2", scheduleAt, canUpdate = true)) - ) - - val replies = queue.offerBatch(messages).unsafeRunSync() - - assertEquals(replies.length, 2) - assertEquals(replies(0).outcome, OfferOutcome.Created) - assertEquals(replies(1).outcome, OfferOutcome.Created) + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + messages = List( + BatchedMessage( + "input1", + ScheduledMessage("key1", "payload1", scheduleAt, canUpdate = true) + ), + BatchedMessage( + "input2", + ScheduledMessage("key2", "payload2", scheduleAt, canUpdate = true) + ) + ) + replies <- queue.offerBatch(messages) + _ <- IO { + assertEquals(replies.length, 2) + assertEquals(replies(0).outcome, OfferOutcome.Created) + assertEquals(replies(1).outcome, OfferOutcome.Created) + } + } yield () + } } test("read should return a message without locking it") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().plusSeconds(10) - - queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() - val envelope = queue.read("key1").unsafeRunSync() - - assert(envelope.isDefined) - assertEquals(envelope.get.payload, "payload1") - - // Message should still exist - val stillExists = queue.containsMessage("key1").unsafeRunSync() - assert(stillExists) + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.read("key1") + stillExists <- queue.containsMessage("key1") + _ <- IO { + assert(envelope.isDefined, "envelope should be defined") + assertEquals(envelope.get.payload, "payload1") + assert(stillExists, "message should still exist after read") + } + } yield () + } } test("dropMessage should remove a message") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().plusSeconds(10) - - queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() - val dropped = queue.dropMessage("key1").unsafeRunSync() - - assert(dropped) - - val exists = queue.containsMessage("key1").unsafeRunSync() - assert(!exists) + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + dropped <- queue.dropMessage("key1") + exists <- queue.containsMessage("key1") + _ <- IO { + assert(dropped, "dropMessage should return true") + assert(!exists, "message should not exist after drop") + } + } yield () + } } test("containsMessage should return true for existing message") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().plusSeconds(10) - - queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() - val exists = queue.containsMessage("key1").unsafeRunSync() - - assert(exists) + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + exists <- queue.containsMessage("key1") + _ <- IO(assert(exists, "message should exist")) + } yield () + } } test("containsMessage should return false for non-existing message") { - val queue = DelayedQueueInMemory.create[String]() - val exists = queue.containsMessage("nonexistent").unsafeRunSync() - assert(!exists) + DelayedQueueInMemory[String]().use { queue => + queue.containsMessage("nonexistent").map { exists => + assert(!exists, "nonexistent message should not exist") + } + } } test("dropAllMessages should remove all messages") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().plusSeconds(10) - - queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() - queue.offerOrUpdate("key2", "payload2", scheduleAt).unsafeRunSync() - - val count = queue.dropAllMessages("Yes, please, I know what I'm doing!").unsafeRunSync() - - assertEquals(count, 2) - - val exists1 = queue.containsMessage("key1").unsafeRunSync() - val exists2 = queue.containsMessage("key2").unsafeRunSync() - assert(!exists1) - assert(!exists2) + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + count <- queue.dropAllMessages("Yes, please, I know what I'm doing!") + exists1 <- queue.containsMessage("key1") + exists2 <- queue.containsMessage("key2") + _ <- IO { + assertEquals(count, 2) + assert(!exists1, "key1 should not exist after dropAll") + assert(!exists2, "key2 should not exist after dropAll") + } + } yield () + } } test("acknowledge should delete the message") { - val queue = DelayedQueueInMemory.create[String]() - val scheduleAt = Instant.now().minusSeconds(1) - - queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() - val envelope = queue.tryPoll.unsafeRunSync() - - assert(envelope.isDefined) - envelope.get.acknowledge.unsafeRunSync() - - // Message should be deleted after acknowledgment - val exists = queue.containsMessage("key1").unsafeRunSync() - assert(!exists) + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.tryPoll + _ <- { + assert(envelope.isDefined, "envelope should be defined") + envelope.get.acknowledge + } + exists <- queue.containsMessage("key1") + _ <- IO(assert(!exists, "message should be deleted after acknowledgment")) + } yield () + } } - test("getCron should return a CronService") { - val queue = DelayedQueueInMemory.create[String]() - val cronService = queue.getCron.unsafeRunSync() - assert(cronService != null) + test("cron should return a CronService") { + DelayedQueueInMemory[String]().use { queue => + queue.cron.map { cronService => + assert(cronService != null, "cronService should not be null") + } + } } test("custom timeConfig should be used") { @@ -207,19 +237,51 @@ class DelayedQueueInMemorySpec extends munit.FunSuite { acquireTimeout = 60.seconds, pollPeriod = 200.milliseconds ) - val queue = DelayedQueueInMemory.create[String](timeConfig = customConfig) - val result = queue.getTimeConfig.unsafeRunSync() - assertEquals(result, customConfig) + DelayedQueueInMemory[String](timeConfig = customConfig).use { queue => + queue.getTimeConfig.assertEquals(customConfig) + } } test("custom ackEnvSource should be used") { - val queue = DelayedQueueInMemory.create[String](ackEnvSource = "custom-source") - val scheduleAt = Instant.now().minusSeconds(1) - - queue.offerOrUpdate("key1", "payload1", scheduleAt).unsafeRunSync() - val envelope = queue.tryPoll.unsafeRunSync() + DelayedQueueInMemory[String](ackEnvSource = "custom-source").use { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.tryPoll + _ <- IO { + assert(envelope.isDefined, "envelope should be defined") + assertEquals(envelope.get.source, "custom-source") + } + } yield () + } + } - assert(envelope.isDefined) - assertEquals(envelope.get.source, "custom-source") + test("time passage: offer in future, tryPoll returns None, advance time, tryPoll succeeds") { + DelayedQueueInMemory[String]().use { queue => + for { + now <- IO.realTime.map(d => Instant.ofEpochMilli(d.toMillis)) + futureTime = now.plusMillis(100) // 100ms in the future + pastTime = now.minusMillis(100) // 100ms in the past + + // Offer a message scheduled for the future + _ <- queue.offerOrUpdate("key1", "payload1", futureTime) + + // Try to poll immediately - should get None (not scheduled yet) + resultBefore <- queue.tryPoll + + // Offer a message scheduled for the past + _ <- queue.offerOrUpdate("key2", "payload2", pastTime) + + // Now tryPoll should succeed on the past-scheduled message + resultAfter <- queue.tryPoll + + _ <- IO { + assertEquals(resultBefore, None, "tryPoll should return None before scheduled time") + assert(resultAfter.isDefined, "tryPoll should return Some for past-scheduled message") + assertEquals(resultAfter.get.payload, "payload2") + assertEquals(resultAfter.get.messageId.value, "key2") + } + } yield () + } } } From eca35630fc6b6fb8fb031fd6d8af077b9647fec7 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 9 Feb 2026 15:38:50 +0200 Subject: [PATCH 6/7] Remove junk --- .../funfix/delayedqueue/scala/CronService.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala index 45fc7bd..9160c38 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala @@ -129,17 +129,3 @@ trait CronService[A] { generator: (Instant) => A ): Resource[IO, Unit] } - -// /** Generates a batch of cron messages based on the current instant. */ -// trait CronMessageBatchGenerator[A] { - -// /** Creates a batch of cron messages. */ -// def apply(now: Instant): List[CronMessage[A]] -// } - -// /** Generates a payload for a given instant. */ -// trait CronPayloadGenerator[A] { - -// /** Creates a payload for the given instant. */ -// def apply(at: Instant): A -// } From a06995a6de5f19ca5fd63d2122978dd7bb152eaf Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 9 Feb 2026 15:59:34 +0200 Subject: [PATCH 7/7] Add concurrency test --- .../scala/DelayedQueueInMemorySpec.scala | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala index 84a4485..ddaac1d 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala @@ -16,6 +16,7 @@ package org.funfix.delayedqueue.scala +import cats.syntax.all.* import cats.effect.IO import java.time.Instant import munit.CatsEffectSuite @@ -284,4 +285,72 @@ class DelayedQueueInMemorySpec extends CatsEffectSuite { } yield () } } + + test("concurrency") { + val producers = 4 + val consumers = 4 + val messageCount = 10000 + val now = Instant.now() + + assert(messageCount % producers == 0, "messageCount should be divisible by number of producers") + assert(messageCount % consumers == 0, "messageCount should be divisible by number of producers") + + def producer(queue: DelayedQueue[String], id: Int, count: Int): IO[Int] = + (1 to count).toList.traverse { i => + val key = s"producer-$id-message-$i" + val payload = s"payload-$key" + queue.offerOrUpdate(key, payload, now).map { outcome => + assertEquals(outcome, OfferOutcome.Created) + 1 + } + }.map { + _.sum + } + + def allProducers(queue: DelayedQueue[String]): IO[Int] = (1 to producers).toList.parTraverse { + id => + producer(queue, id, messageCount / producers) + }.map { + _.sum + } + + def consumer(queue: DelayedQueue[String], count: Int): IO[Int] = (1 to count).toList.traverse { + _ => + queue.poll.map { envelope => + assert( + envelope.payload.startsWith("payload-"), + "payload should have correct format" + ) + 1 + } + }.map { + _.sum + } + + def allConsumers(queue: DelayedQueue[String]): IO[Int] = (1 to consumers).toList.parTraverse { + _ => + consumer(queue, messageCount / consumers) + }.map { + _.sum + } + + val res = + for { + queue <- DelayedQueueInMemory[String]() + prodFiber <- allProducers(queue).background + conFiber <- allConsumers(queue).background + } yield (prodFiber, conFiber) + + res.use { case (prodFiber, conFiber) => + for { + p <- prodFiber + p <- p.embedNever + c <- conFiber + c <- c.embedNever + } yield { + assertEquals(p, messageCount) + assertEquals(c, messageCount) + } + }.timeout(30.seconds) + } }