diff --git a/build.sbt b/build.sbt index 7f517ab..40590b3 100644 --- a/build.sbt +++ b/build.sbt @@ -64,22 +64,18 @@ lazy val root = project ) .aggregate(delayedqueueJVM) -lazy val delayedqueue = crossProject(JVMPlatform) - .crossType(CrossType.Full) +lazy val delayedqueueJVM = project .in(file("delayedqueue-scala")) .settings( - name := "delayedqueue-scala" - ) - .jvmSettings( + name := "delayedqueue-scala", libraryDependencies ++= Seq( "org.funfix" % "delayedqueue-jvm" % version.value, "org.typelevel" %% "cats-effect" % "3.6.3", // Testing "org.scalameta" %% "munit" % "1.0.4" % Test, "org.typelevel" %% "munit-cats-effect" % "2.1.0" % Test, + "org.typelevel" %% "cats-effect-testkit" % "3.6.3" % Test, "org.scalacheck" %% "scalacheck" % "1.19.0" % Test, "org.scalameta" %% "munit-scalacheck" % "1.2.0" % Test, ) ) - -lazy val delayedqueueJVM = delayedqueue.jvm diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala similarity index 89% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala index 09c43f9..9160c38 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/CronService.scala @@ -16,8 +16,7 @@ package org.funfix.delayedqueue.scala -import cats.effect.IO -import cats.effect.Resource +import cats.effect.{Resource, IO} import java.time.Duration import java.time.Instant @@ -87,7 +86,7 @@ trait CronService[A] { configHash: CronConfigHash, keyPrefix: String, scheduleInterval: Duration, - generateMany: CronMessageBatchGenerator[A] + generateMany: (Instant) => List[CronMessage[A]] ): Resource[IO, Unit] /** Installs a daily schedule with timezone-aware execution times. @@ -107,7 +106,7 @@ trait CronService[A] { def installDailySchedule( keyPrefix: String, schedule: CronDailySchedule, - generator: CronMessageGenerator[A] + generator: (Instant) => CronMessage[A] ): Resource[IO, Unit] /** Installs a periodic tick that generates messages at fixed intervals. @@ -127,20 +126,6 @@ trait CronService[A] { def installPeriodicTick( keyPrefix: String, period: Duration, - generator: CronPayloadGenerator[A] + generator: (Instant) => A ): Resource[IO, Unit] } - -/** Generates a batch of cron messages based on the current instant. */ -trait CronMessageBatchGenerator[A] { - - /** Creates a batch of cron messages. */ - def apply(now: Instant): List[CronMessage[A]] -} - -/** Generates a payload for a given instant. */ -trait CronPayloadGenerator[A] { - - /** Creates a payload for the given instant. */ - def apply(at: Instant): A -} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala similarity index 99% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala index 05ee1de..1257bac 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueue.scala @@ -119,5 +119,5 @@ trait DelayedQueue[A] { def dropAllMessages(confirm: String): IO[Int] /** Utilities for installing cron-like schedules. */ - def getCron: IO[CronService[A]] + def cron: IO[CronService[A]] } diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala new file mode 100644 index 0000000..87f46c3 --- /dev/null +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala @@ -0,0 +1,239 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.effect.{IO, Resource, Clock} +import cats.syntax.functor.* +import java.time.{Clock as JavaClock, Instant} +import org.funfix.delayedqueue.jvm +import org.funfix.delayedqueue.scala.AckEnvelope.asScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala +import org.funfix.delayedqueue.scala.BatchedReply.asScala +import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala +import scala.jdk.CollectionConverters.* +import cats.effect.std.Dispatcher + +/** In-memory implementation of [[DelayedQueue]] using concurrent data structures. + * + * This implementation wraps the JVM [[org.funfix.delayedqueue.jvm.DelayedQueueInMemory]] and + * provides an idiomatic Scala API with Cats Effect IO for managing side effects. + * + * ==Example== + * + * {{{ + * import cats.effect.IO + * import java.time.Instant + * + * def worker(queue: DelayedQueue[IO, String]): IO[Unit] = { + * val process1 = for { + * envelope <- queue.poll + * _ <- logger.info("Received: " + envelope.payload) + * _ <- envelope.acknowledge + * } yield () + * + * process1.attempt + * .onErrorHandleWith { error => + * logger.error("Error processing message, will reprocess after timeout", error) + * } + * .flatMap { _ => + * worker(queue) // Continue processing the next message + * } + * } + * + * DelayedQueueInMemory[IO, String]().use { queue => + * worker(queue).background.use { _ => + * // Push one message after 10 seconds + * queue.offerOrUpdate("key1", "Hello", Instant.now().plusSeconds(10)) + * } + * } + * }}} + * + * @tparam A + * the type of message payloads + */ +object DelayedQueueInMemory { + + /** Creates an in-memory delayed queue with default configuration. + * + * @tparam A + * the type of message payloads + * @param timeConfig + * time configuration (defaults to [[DelayedQueueTimeConfig.DEFAULT_IN_MEMORY]]) + * @param ackEnvSource + * source identifier for envelopes (defaults to "delayed-queue-inmemory") + * @param clock + * clock for time operations (defaults to system UTC clock) + * @return + * a new DelayedQueue instance + */ + def apply[A]( + timeConfig: DelayedQueueTimeConfig = DelayedQueueTimeConfig.DEFAULT_IN_MEMORY, + ackEnvSource: String = "delayed-queue-inmemory" + ): Resource[IO, DelayedQueue[A]] = + Dispatcher.sequential[IO].evalMap { dispatcher => + IO { + val javaClock = CatsClockToJavaClock(dispatcher) + val jvmQueue = jvm.DelayedQueueInMemory.create[A]( + timeConfig.asJava, + ackEnvSource, + javaClock + ) + new DelayedQueueInMemoryWrapper(jvmQueue) + } + } + + /** Wrapper that implements the Scala DelayedQueue trait by delegating to the JVM implementation. + */ + private class DelayedQueueInMemoryWrapper[A]( + underlying: jvm.DelayedQueueInMemory[A] + ) extends DelayedQueue[A] { + + override def getTimeConfig: IO[DelayedQueueTimeConfig] = + IO(underlying.getTimeConfig.asScala) + + override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) + + override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) + + override def offerBatch[In]( + messages: List[BatchedMessage[In, A]] + ): IO[List[BatchedReply[In, A]]] = + IO { + val javaMessages = messages.map(_.asJava).asJava + val javaReplies = underlying.offerBatch(javaMessages) + javaReplies.asScala.toList.map(_.asScala) + } + + override def tryPoll: IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.tryPoll()).map(_.asScala) + } + + override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = + IO { + val javaEnvelope = underlying.tryPollMany(batchMaxSize) + AckEnvelope( + payload = javaEnvelope.payload.asScala.toList, + messageId = MessageId.asScala(javaEnvelope.messageId), + timestamp = javaEnvelope.timestamp, + source = javaEnvelope.source, + deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), + acknowledge = IO.blocking(javaEnvelope.acknowledge()) + ) + } + + override def poll: IO[AckEnvelope[A]] = + IO.interruptible(underlying.poll().asScala) + + override def read(key: String): IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.read(key)).map(_.asScala) + } + + override def dropMessage(key: String): IO[Boolean] = + IO(underlying.dropMessage(key)) + + override def containsMessage(key: String): IO[Boolean] = + IO(underlying.containsMessage(key)) + + override def dropAllMessages(confirm: String): IO[Int] = + IO(underlying.dropAllMessages(confirm)) + + override def cron: IO[CronService[A]] = + IO(new CronServiceWrapper(underlying.getCron)) + } + + /** Wrapper for CronService that delegates to the JVM implementation. */ + private class CronServiceWrapper[A]( + underlying: jvm.CronService[A] + ) extends CronService[A] { + + override def installTick( + configHash: CronConfigHash, + keyPrefix: String, + messages: List[CronMessage[A]] + ): IO[Unit] = + IO { + val javaMessages = messages.map(_.asJava).asJava + underlying.installTick(configHash.asJava, keyPrefix, javaMessages) + } + + override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = + IO { + underlying.uninstallTick(configHash.asJava, keyPrefix) + } + + override def install( + configHash: CronConfigHash, + keyPrefix: String, + scheduleInterval: java.time.Duration, + generateMany: (Instant) => List[CronMessage[A]] + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.install( + configHash.asJava, + keyPrefix, + scheduleInterval, + now => generateMany(now).map(_.asJava).asJava + ) + }).void + + override def installDailySchedule( + keyPrefix: String, + schedule: CronDailySchedule, + generator: (Instant) => CronMessage[A] + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.installDailySchedule( + keyPrefix, + schedule.asJava, + now => generator(now).asJava + ) + }).void + + override def installPeriodicTick( + keyPrefix: String, + period: java.time.Duration, + generator: (Instant) => A + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.installPeriodicTick( + keyPrefix, + period, + now => generator(now) + ) + }).void + } +} + +private final class CatsClockToJavaClock( + dispatcher: Dispatcher[IO], + zone: java.time.ZoneId = java.time.ZoneId.systemDefault() +)(using Clock[IO]) extends JavaClock { + override def getZone: java.time.ZoneId = + zone + + override def withZone(zone: java.time.ZoneId): JavaClock = + new CatsClockToJavaClock(dispatcher, zone) + + override def instant(): Instant = + dispatcher.unsafeRunSync( + Clock[IO].realTime.map(it => Instant.ofEpochMilli(it.toMillis)) + ) +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/hello.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/hello.scala similarity index 100% rename from delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/hello.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/hello.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala new file mode 100644 index 0000000..ddaac1d --- /dev/null +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemorySpec.scala @@ -0,0 +1,356 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.syntax.all.* +import cats.effect.IO +import java.time.Instant +import munit.CatsEffectSuite +import scala.concurrent.duration.* + +class DelayedQueueInMemorySpec extends CatsEffectSuite { + + test("apply should return a working queue") { + DelayedQueueInMemory[String]().use { queue => + queue.getTimeConfig.assertEquals(DelayedQueueTimeConfig.DEFAULT_IN_MEMORY) + } + } + + test("offerOrUpdate should create a new message") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + result <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + } yield assertEquals(result, OfferOutcome.Created) + } + } + + test("offerOrUpdate should update an existing message") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + result <- queue.offerOrUpdate("key1", "payload2", scheduleAt.plusSeconds(5)) + } yield assertEquals(result, OfferOutcome.Updated) + } + } + + test("offerIfNotExists should create a new message") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + result <- queue.offerIfNotExists("key1", "payload1", scheduleAt) + } yield assertEquals(result, OfferOutcome.Created) + } + } + + test("offerIfNotExists should ignore existing message") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerIfNotExists("key1", "payload1", scheduleAt) + result <- queue.offerIfNotExists("key1", "payload2", scheduleAt.plusSeconds(5)) + } yield assertEquals(result, OfferOutcome.Ignored) + } + } + + test("tryPoll should return None when no messages are available") { + DelayedQueueInMemory[String]().use { queue => + queue.tryPoll.assertEquals(None) + } + } + + test("tryPoll should return a message when scheduled time has passed") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.tryPoll + _ <- IO { + assert(envelope.isDefined) + assertEquals(envelope.get.payload, "payload1") + assertEquals(envelope.get.messageId.value, "key1") + } + } yield () + } + } + + test("tryPollMany should return empty list when no messages are available") { + DelayedQueueInMemory[String]().use { queue => + queue.tryPollMany(10).map { envelope => + assertEquals(envelope.payload, List.empty[String]) + } + } + } + + test("tryPollMany should return multiple messages") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + _ <- queue.offerOrUpdate("key3", "payload3", scheduleAt) + envelope <- queue.tryPollMany(5) + _ <- IO { + assertEquals(envelope.payload.length, 3) + assertEquals( + envelope.payload.toSet, + Set("payload1", "payload2", "payload3"), + "tryPollMany should return all three messages" + ) + } + } yield () + } + } + + test("offerBatch should handle multiple messages") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + messages = List( + BatchedMessage( + "input1", + ScheduledMessage("key1", "payload1", scheduleAt, canUpdate = true) + ), + BatchedMessage( + "input2", + ScheduledMessage("key2", "payload2", scheduleAt, canUpdate = true) + ) + ) + replies <- queue.offerBatch(messages) + _ <- IO { + assertEquals(replies.length, 2) + assertEquals(replies(0).outcome, OfferOutcome.Created) + assertEquals(replies(1).outcome, OfferOutcome.Created) + } + } yield () + } + } + + test("read should return a message without locking it") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.read("key1") + stillExists <- queue.containsMessage("key1") + _ <- IO { + assert(envelope.isDefined, "envelope should be defined") + assertEquals(envelope.get.payload, "payload1") + assert(stillExists, "message should still exist after read") + } + } yield () + } + } + + test("dropMessage should remove a message") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + dropped <- queue.dropMessage("key1") + exists <- queue.containsMessage("key1") + _ <- IO { + assert(dropped, "dropMessage should return true") + assert(!exists, "message should not exist after drop") + } + } yield () + } + } + + test("containsMessage should return true for existing message") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + exists <- queue.containsMessage("key1") + _ <- IO(assert(exists, "message should exist")) + } yield () + } + } + + test("containsMessage should return false for non-existing message") { + DelayedQueueInMemory[String]().use { queue => + queue.containsMessage("nonexistent").map { exists => + assert(!exists, "nonexistent message should not exist") + } + } + } + + test("dropAllMessages should remove all messages") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + count <- queue.dropAllMessages("Yes, please, I know what I'm doing!") + exists1 <- queue.containsMessage("key1") + exists2 <- queue.containsMessage("key2") + _ <- IO { + assertEquals(count, 2) + assert(!exists1, "key1 should not exist after dropAll") + assert(!exists2, "key2 should not exist after dropAll") + } + } yield () + } + } + + test("acknowledge should delete the message") { + DelayedQueueInMemory[String]().use { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.tryPoll + _ <- { + assert(envelope.isDefined, "envelope should be defined") + envelope.get.acknowledge + } + exists <- queue.containsMessage("key1") + _ <- IO(assert(!exists, "message should be deleted after acknowledgment")) + } yield () + } + } + + test("cron should return a CronService") { + DelayedQueueInMemory[String]().use { queue => + queue.cron.map { cronService => + assert(cronService != null, "cronService should not be null") + } + } + } + + test("custom timeConfig should be used") { + val customConfig = DelayedQueueTimeConfig( + acquireTimeout = 60.seconds, + pollPeriod = 200.milliseconds + ) + DelayedQueueInMemory[String](timeConfig = customConfig).use { queue => + queue.getTimeConfig.assertEquals(customConfig) + } + } + + test("custom ackEnvSource should be used") { + DelayedQueueInMemory[String](ackEnvSource = "custom-source").use { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.tryPoll + _ <- IO { + assert(envelope.isDefined, "envelope should be defined") + assertEquals(envelope.get.source, "custom-source") + } + } yield () + } + } + + test("time passage: offer in future, tryPoll returns None, advance time, tryPoll succeeds") { + DelayedQueueInMemory[String]().use { queue => + for { + now <- IO.realTime.map(d => Instant.ofEpochMilli(d.toMillis)) + futureTime = now.plusMillis(100) // 100ms in the future + pastTime = now.minusMillis(100) // 100ms in the past + + // Offer a message scheduled for the future + _ <- queue.offerOrUpdate("key1", "payload1", futureTime) + + // Try to poll immediately - should get None (not scheduled yet) + resultBefore <- queue.tryPoll + + // Offer a message scheduled for the past + _ <- queue.offerOrUpdate("key2", "payload2", pastTime) + + // Now tryPoll should succeed on the past-scheduled message + resultAfter <- queue.tryPoll + + _ <- IO { + assertEquals(resultBefore, None, "tryPoll should return None before scheduled time") + assert(resultAfter.isDefined, "tryPoll should return Some for past-scheduled message") + assertEquals(resultAfter.get.payload, "payload2") + assertEquals(resultAfter.get.messageId.value, "key2") + } + } yield () + } + } + + test("concurrency") { + val producers = 4 + val consumers = 4 + val messageCount = 10000 + val now = Instant.now() + + assert(messageCount % producers == 0, "messageCount should be divisible by number of producers") + assert(messageCount % consumers == 0, "messageCount should be divisible by number of producers") + + def producer(queue: DelayedQueue[String], id: Int, count: Int): IO[Int] = + (1 to count).toList.traverse { i => + val key = s"producer-$id-message-$i" + val payload = s"payload-$key" + queue.offerOrUpdate(key, payload, now).map { outcome => + assertEquals(outcome, OfferOutcome.Created) + 1 + } + }.map { + _.sum + } + + def allProducers(queue: DelayedQueue[String]): IO[Int] = (1 to producers).toList.parTraverse { + id => + producer(queue, id, messageCount / producers) + }.map { + _.sum + } + + def consumer(queue: DelayedQueue[String], count: Int): IO[Int] = (1 to count).toList.traverse { + _ => + queue.poll.map { envelope => + assert( + envelope.payload.startsWith("payload-"), + "payload should have correct format" + ) + 1 + } + }.map { + _.sum + } + + def allConsumers(queue: DelayedQueue[String]): IO[Int] = (1 to consumers).toList.parTraverse { + _ => + consumer(queue, messageCount / consumers) + }.map { + _.sum + } + + val res = + for { + queue <- DelayedQueueInMemory[String]() + prodFiber <- allProducers(queue).background + conFiber <- allConsumers(queue).background + } yield (prodFiber, conFiber) + + res.use { case (prodFiber, conFiber) => + for { + p <- prodFiber + p <- p.embedNever + c <- conFiber + c <- c.embedNever + } yield { + assertEquals(p, messageCount) + assertEquals(c, messageCount) + } + }.timeout(30.seconds) + } +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/HelloSuite.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/HelloSuite.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/HelloSuite.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/HelloSuite.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala similarity index 100% rename from delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala rename to delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala diff --git a/project/plugins.sbt b/project/plugins.sbt index da89c90..e612a01 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,3 @@ -addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.3.2") -addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.3.2") -addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.20.2") -addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.5.10") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.6") addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.2")