diff --git a/build.sbt b/build.sbt index 40590b3..585dbec 100644 --- a/build.sbt +++ b/build.sbt @@ -14,6 +14,7 @@ inThisBuild( scalacOptions ++= Seq( "-no-indent", "-Yexplicit-nulls", + "-Werror", ), // --- // Settings for dealing with the local Gradle-assembled artifacts @@ -77,5 +78,9 @@ lazy val delayedqueueJVM = project "org.typelevel" %% "cats-effect-testkit" % "3.6.3" % Test, "org.scalacheck" %% "scalacheck" % "1.19.0" % Test, "org.scalameta" %% "munit-scalacheck" % "1.2.0" % Test, + // JDBC drivers for testing + "com.h2database" % "h2" % "2.4.240" % Test, + "org.hsqldb" % "hsqldb" % "2.7.4" % Test, + "org.xerial" % "sqlite-jdbc" % "3.51.1.0" % Test, ) ) diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala index 87f46c3..3fc449b 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala @@ -17,14 +17,8 @@ package org.funfix.delayedqueue.scala import cats.effect.{IO, Resource, Clock} -import cats.syntax.functor.* import java.time.{Clock as JavaClock, Instant} import org.funfix.delayedqueue.jvm -import org.funfix.delayedqueue.scala.AckEnvelope.asScala -import org.funfix.delayedqueue.scala.OfferOutcome.asScala -import org.funfix.delayedqueue.scala.BatchedReply.asScala -import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala -import scala.jdk.CollectionConverters.* import cats.effect.std.Dispatcher /** In-memory implementation of [[DelayedQueue]] using concurrent data structures. @@ -92,137 +86,12 @@ object DelayedQueueInMemory { ackEnvSource, javaClock ) - new DelayedQueueInMemoryWrapper(jvmQueue) + new DelayedQueueWrapper(jvmQueue) } } - - /** Wrapper that implements the Scala DelayedQueue trait by delegating to the JVM implementation. - */ - private class DelayedQueueInMemoryWrapper[A]( - underlying: jvm.DelayedQueueInMemory[A] - ) extends DelayedQueue[A] { - - override def getTimeConfig: IO[DelayedQueueTimeConfig] = - IO(underlying.getTimeConfig.asScala) - - override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = - IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) - - override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = - IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) - - override def offerBatch[In]( - messages: List[BatchedMessage[In, A]] - ): IO[List[BatchedReply[In, A]]] = - IO { - val javaMessages = messages.map(_.asJava).asJava - val javaReplies = underlying.offerBatch(javaMessages) - javaReplies.asScala.toList.map(_.asScala) - } - - override def tryPoll: IO[Option[AckEnvelope[A]]] = - IO { - Option(underlying.tryPoll()).map(_.asScala) - } - - override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = - IO { - val javaEnvelope = underlying.tryPollMany(batchMaxSize) - AckEnvelope( - payload = javaEnvelope.payload.asScala.toList, - messageId = MessageId.asScala(javaEnvelope.messageId), - timestamp = javaEnvelope.timestamp, - source = javaEnvelope.source, - deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), - acknowledge = IO.blocking(javaEnvelope.acknowledge()) - ) - } - - override def poll: IO[AckEnvelope[A]] = - IO.interruptible(underlying.poll().asScala) - - override def read(key: String): IO[Option[AckEnvelope[A]]] = - IO { - Option(underlying.read(key)).map(_.asScala) - } - - override def dropMessage(key: String): IO[Boolean] = - IO(underlying.dropMessage(key)) - - override def containsMessage(key: String): IO[Boolean] = - IO(underlying.containsMessage(key)) - - override def dropAllMessages(confirm: String): IO[Int] = - IO(underlying.dropAllMessages(confirm)) - - override def cron: IO[CronService[A]] = - IO(new CronServiceWrapper(underlying.getCron)) - } - - /** Wrapper for CronService that delegates to the JVM implementation. */ - private class CronServiceWrapper[A]( - underlying: jvm.CronService[A] - ) extends CronService[A] { - - override def installTick( - configHash: CronConfigHash, - keyPrefix: String, - messages: List[CronMessage[A]] - ): IO[Unit] = - IO { - val javaMessages = messages.map(_.asJava).asJava - underlying.installTick(configHash.asJava, keyPrefix, javaMessages) - } - - override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = - IO { - underlying.uninstallTick(configHash.asJava, keyPrefix) - } - - override def install( - configHash: CronConfigHash, - keyPrefix: String, - scheduleInterval: java.time.Duration, - generateMany: (Instant) => List[CronMessage[A]] - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.install( - configHash.asJava, - keyPrefix, - scheduleInterval, - now => generateMany(now).map(_.asJava).asJava - ) - }).void - - override def installDailySchedule( - keyPrefix: String, - schedule: CronDailySchedule, - generator: (Instant) => CronMessage[A] - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.installDailySchedule( - keyPrefix, - schedule.asJava, - now => generator(now).asJava - ) - }).void - - override def installPeriodicTick( - keyPrefix: String, - period: java.time.Duration, - generator: (Instant) => A - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.installPeriodicTick( - keyPrefix, - period, - now => generator(now) - ) - }).void - } } -private final class CatsClockToJavaClock( +private[scala] final class CatsClockToJavaClock( dispatcher: Dispatcher[IO], zone: java.time.ZoneId = java.time.ZoneId.systemDefault() )(using Clock[IO]) extends JavaClock { diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala new file mode 100644 index 0000000..7e880bb --- /dev/null +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala @@ -0,0 +1,108 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.effect.{IO, Resource} +import cats.effect.std.Dispatcher +import org.funfix.delayedqueue.jvm + +/** JDBC-based implementation of [[DelayedQueue]] with support for multiple database backends. + * + * This implementation wraps the JVM [[org.funfix.delayedqueue.jvm.DelayedQueueJDBC]] and provides + * an idiomatic Scala API with Cats Effect IO for managing side effects. + * + * ==Example== + * + * {{{ + * import cats.effect.IO + * import java.time.Instant + * + * val dbConfig = JdbcConnectionConfig( + * url = "jdbc:h2:mem:testdb", + * driver = JdbcDriver.H2, + * username = Some("sa"), + * password = Some("") + * ) + * + * val config = DelayedQueueJDBCConfig.create( + * db = dbConfig, + * tableName = "delayed_queue", + * queueName = "my-queue" + * ) + * + * // Run migrations first (once per database) + * DelayedQueueJDBC.runMigrations(config).unsafeRunSync() + * + * // Create and use the queue (using implicit PayloadCodec.forStrings) + * DelayedQueueJDBC[String](config).use { queue => + * for { + * _ <- queue.offerOrUpdate("key1", "Hello", Instant.now().plusSeconds(10)) + * envelope <- queue.poll + * _ <- IO.println(s"Received: $${envelope.payload}") + * _ <- envelope.acknowledge + * } yield () + * } + * }}} + * + * @tparam A + * the type of message payloads + */ +object DelayedQueueJDBC { + + /** Creates a JDBC-based delayed queue with the given configuration. + * + * @tparam A + * the type of message payloads + * @param config + * JDBC queue configuration + * @param codec + * implicit payload codec for serialization (e.g., PayloadCodec.forStrings for String) + * @return + * a Resource that manages the queue lifecycle + */ + def apply[A]( + config: DelayedQueueJDBCConfig + )(using codec: PayloadCodec[A]): Resource[IO, DelayedQueue[A]] = + Dispatcher.sequential[IO].flatMap { dispatcher => + Resource.fromAutoCloseable(IO { + val javaClock = CatsClockToJavaClock(dispatcher) + jvm.DelayedQueueJDBC.create[A]( + codec.asJava, + config.asJava, + javaClock + ) + }).map(jvmQueue => new DelayedQueueWrapper(jvmQueue)) + } + + /** Runs database migrations for the queue. + * + * This should be called once per database before creating queue instances. + * + * @param config + * JDBC queue configuration + * @return + * IO action that runs the migrations + */ + def runMigrations( + config: DelayedQueueJDBCConfig + ): IO[Unit] = + IO.interruptible { + jvm.DelayedQueueJDBC.runMigrations( + config.asJava + ) + } +} diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/PayloadCodec.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/PayloadCodec.scala new file mode 100644 index 0000000..4eabada --- /dev/null +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/PayloadCodec.scala @@ -0,0 +1,93 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.jvm +import scala.util.control.NonFatal + +/** Type-class for encoding and decoding message payloads to/from binary data. + * + * This is used by JDBC implementations to store message payloads in the database. + * + * @tparam A + * the type of message payloads + */ +trait PayloadCodec[A] { + + /** Returns the fully-qualified type name of the messages this codec handles. + * + * This is used for queue partitioning and message routing. + * + * @return + * the fully-qualified type name (e.g., "java.lang.String") + */ + def typeName: String + + /** Serializes a payload to a byte array. + * + * @param payload + * the payload to serialize + * @return + * the serialized byte representation + */ + def serialize(payload: A): Array[Byte] + + /** Deserializes a payload from a byte array. + * + * @param serialized + * the serialized bytes + * @return + * Either the deserialized payload or an IllegalArgumentException if parsing fails + */ + def deserialize(serialized: Array[Byte]): Either[IllegalArgumentException, A] + + /** Converts this Scala PayloadCodec to a JVM MessageSerializer. */ + def asJava: jvm.MessageSerializer[A] = + new jvm.MessageSerializer[A] { + override def getTypeName(): String = PayloadCodec.this.typeName + override def serialize(payload: A): Array[Byte] = PayloadCodec.this.serialize(payload) + override def deserialize(serialized: Array[Byte]): A = + PayloadCodec.this.deserialize(serialized) match { + case Right(value) => value + case Left(error) => throw error + } + } +} + +object PayloadCodec { + + /** Given instance for String payloads using UTF-8 encoding. + * + * This is based on the JVM MessageSerializer.forStrings implementation. + */ + given forStrings: PayloadCodec[String] = + fromJava(jvm.MessageSerializer.forStrings()) + + /** Wraps a JVM MessageSerializer to provide a Scala PayloadCodec interface. */ + def fromJava[A](javaSerializer: jvm.MessageSerializer[A]): PayloadCodec[A] = + new PayloadCodec[A] { + override def typeName: String = javaSerializer.getTypeName() + override def serialize(payload: A): Array[Byte] = javaSerializer.serialize(payload) + override def deserialize(serialized: Array[Byte]): Either[IllegalArgumentException, A] = + try { + Right(javaSerializer.deserialize(serialized)) + } catch { + case e: IllegalArgumentException => Left(e) + case NonFatal(e) => Left(new IllegalArgumentException(e.getMessage, e)) + } + } +} diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala new file mode 100644 index 0000000..ba79bf5 --- /dev/null +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala @@ -0,0 +1,153 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.effect.{IO, Resource} +import cats.syntax.functor.* +import java.time.Instant +import org.funfix.delayedqueue.jvm +import org.funfix.delayedqueue.scala.AckEnvelope.asScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala +import org.funfix.delayedqueue.scala.BatchedReply.asScala +import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala +import scala.jdk.CollectionConverters.* + +/** Wrapper that implements the Scala DelayedQueue trait by delegating to a JVM DelayedQueue + * implementation. + */ +private[scala] class DelayedQueueWrapper[A]( + underlying: jvm.DelayedQueue[A] +) extends DelayedQueue[A] { + + override def getTimeConfig: IO[DelayedQueueTimeConfig] = + IO(underlying.getTimeConfig.asScala) + + override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) + + override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) + + override def offerBatch[In]( + messages: List[BatchedMessage[In, A]] + ): IO[List[BatchedReply[In, A]]] = + IO { + val javaMessages = messages.map(_.asJava).asJava + val javaReplies = underlying.offerBatch(javaMessages) + javaReplies.asScala.toList.map(_.asScala) + } + + override def tryPoll: IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.tryPoll()).map(_.asScala) + } + + override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = + IO { + val javaEnvelope = underlying.tryPollMany(batchMaxSize) + AckEnvelope( + payload = javaEnvelope.payload.asScala.toList, + messageId = MessageId.asScala(javaEnvelope.messageId), + timestamp = javaEnvelope.timestamp, + source = javaEnvelope.source, + deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), + acknowledge = IO.blocking(javaEnvelope.acknowledge()) + ) + } + + override def poll: IO[AckEnvelope[A]] = + IO.interruptible(underlying.poll().asScala) + + override def read(key: String): IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.read(key)).map(_.asScala) + } + + override def dropMessage(key: String): IO[Boolean] = + IO(underlying.dropMessage(key)) + + override def containsMessage(key: String): IO[Boolean] = + IO(underlying.containsMessage(key)) + + override def dropAllMessages(confirm: String): IO[Int] = + IO(underlying.dropAllMessages(confirm)) + + override def cron: IO[CronService[A]] = + IO(new CronServiceWrapper(underlying.getCron)) +} + +/** Wrapper for CronService that delegates to the JVM implementation. */ +private[scala] class CronServiceWrapper[A]( + underlying: jvm.CronService[A] +) extends CronService[A] { + + override def installTick( + configHash: CronConfigHash, + keyPrefix: String, + messages: List[CronMessage[A]] + ): IO[Unit] = + IO { + val javaMessages = messages.map(_.asJava).asJava + underlying.installTick(configHash.asJava, keyPrefix, javaMessages) + } + + override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = + IO { + underlying.uninstallTick(configHash.asJava, keyPrefix) + } + + override def install( + configHash: CronConfigHash, + keyPrefix: String, + scheduleInterval: java.time.Duration, + generateMany: (Instant) => List[CronMessage[A]] + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.install( + configHash.asJava, + keyPrefix, + scheduleInterval, + now => generateMany(now).map(_.asJava).asJava + ) + }).void + + override def installDailySchedule( + keyPrefix: String, + schedule: CronDailySchedule, + generator: (Instant) => CronMessage[A] + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.installDailySchedule( + keyPrefix, + schedule.asJava, + now => generator(now).asJava + ) + }).void + + override def installPeriodicTick( + keyPrefix: String, + period: java.time.Duration, + generator: (Instant) => A + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.installPeriodicTick( + keyPrefix, + period, + now => generator(now) + ) + }).void +} diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala index ae7b7ee..7a5cbc3 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala @@ -20,9 +20,6 @@ import java.time.Instant import java.time.LocalTime import java.time.ZoneId import java.time.Duration -import org.funfix.delayedqueue.scala.CronConfigHash.asScala -import org.funfix.delayedqueue.scala.CronMessage.asScala as cronAsScala -import org.funfix.delayedqueue.scala.CronDailySchedule.asScala as scheduleAsScala class CronSpec extends munit.FunSuite { diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala index 5858825..d2ef261 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala @@ -21,17 +21,17 @@ import org.scalacheck.Prop.* import org.scalacheck.Gen import scala.concurrent.duration.FiniteDuration import java.time.{Instant, LocalTime, ZoneId} -import org.funfix.delayedqueue.scala.ScheduledMessage.asScala as scheduledAsScala -import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala as timeConfigAsScala -import org.funfix.delayedqueue.scala.MessageId.asScala as messageIdAsScala -import org.funfix.delayedqueue.scala.OfferOutcome.asScala as offerOutcomeAsScala -import org.funfix.delayedqueue.scala.Generators.{given, *} +import org.funfix.delayedqueue.scala.ScheduledMessage.asScala +import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala +import org.funfix.delayedqueue.scala.MessageId.asScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala +import org.funfix.delayedqueue.scala.Generators.given class DataStructuresPropertySpec extends ScalaCheckSuite { property("ScheduledMessage asJava/asScala roundtrip preserves data") { forAll { (key: String, payload: String, instant: Instant, canUpdate: Boolean) => val original = ScheduledMessage(key, payload, instant, canUpdate) - val roundtripped = original.asJava.scheduledAsScala + val roundtripped = original.asJava.asScala assertEquals(roundtripped.key, original.key) assertEquals(roundtripped.payload, original.payload) @@ -43,7 +43,7 @@ class DataStructuresPropertySpec extends ScalaCheckSuite { property("DelayedQueueTimeConfig asJava/asScala roundtrip preserves data") { forAll { (acquireTimeout: FiniteDuration, pollPeriod: FiniteDuration) => val original = DelayedQueueTimeConfig(acquireTimeout, pollPeriod) - val roundtripped = original.asJava.timeConfigAsScala + val roundtripped = original.asJava.asScala assertEquals(roundtripped.acquireTimeout.toMillis, original.acquireTimeout.toMillis) assertEquals(roundtripped.pollPeriod.toMillis, original.pollPeriod.toMillis) @@ -54,7 +54,7 @@ class DataStructuresPropertySpec extends ScalaCheckSuite { forAll { (value: String) => val messageId = MessageId(value) assertEquals(messageId.value, value) - assertEquals(messageId.asJava.messageIdAsScala.value, value) + assertEquals(messageId.asJava.asScala.value, value) } } @@ -117,7 +117,7 @@ class DataStructuresPropertySpec extends ScalaCheckSuite { property("OfferOutcome asJava/asScala roundtrip") { forAll(Gen.oneOf(OfferOutcome.Created, OfferOutcome.Updated, OfferOutcome.Ignored)) { outcome => - val roundtripped = outcome.asJava.offerOutcomeAsScala + val roundtripped = outcome.asJava.asScala assertEquals(roundtripped, outcome) } } diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala new file mode 100644 index 0000000..8763018 --- /dev/null +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala @@ -0,0 +1,383 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.syntax.all.* +import cats.effect.IO +import java.time.Instant +import munit.CatsEffectSuite +import scala.concurrent.duration.* + +/** Base test suite for DelayedQueueJDBC with common test cases. */ +abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { + + // Import the given PayloadCodec for String + import PayloadCodec.given + + /** Create a queue configuration for testing. Subclasses override this. */ + def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig + + /** Helper to create a queue with default settings. */ + def createQueue(tableName: String = "delayed_queue", queueName: String = "test-queue") = + DelayedQueueJDBC[String](createConfig(tableName, queueName)) + + /** Helper to run migrations and create a queue with unique table names to ensure test isolation. + */ + def withQueue(test: DelayedQueue[String] => IO[Unit]): IO[Unit] = { + // Use a unique table name for each test to avoid interference + val tableName = s"test_table_${System.nanoTime()}" + val queueName = "test-queue" + val config = createConfig(tableName, queueName) + for { + _ <- DelayedQueueJDBC.runMigrations(config) + _ <- createQueue(tableName, queueName).use(test) + } yield () + } + + test("offerOrUpdate should create a new message") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + result <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + } yield assertEquals(result, OfferOutcome.Created) + } + } + + test("offerOrUpdate should update an existing message") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + result <- queue.offerOrUpdate("key1", "payload2", scheduleAt.plusSeconds(5)) + } yield assertEquals(result, OfferOutcome.Updated) + } + } + + test("offerIfNotExists should create a new message") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + result <- queue.offerIfNotExists("key1", "payload1", scheduleAt) + } yield assertEquals(result, OfferOutcome.Created) + } + } + + test("offerIfNotExists should ignore existing message") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerIfNotExists("key1", "payload1", scheduleAt) + result <- queue.offerIfNotExists("key1", "payload2", scheduleAt.plusSeconds(5)) + } yield assertEquals(result, OfferOutcome.Ignored) + } + } + + test("tryPoll should return None when no messages are available") { + withQueue { queue => + queue.tryPoll.assertEquals(None) + } + } + + test("tryPoll should return a message when scheduled time has passed") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.tryPoll + _ <- IO { + assert(envelope.isDefined) + assertEquals(envelope.get.payload, "payload1") + assertEquals(envelope.get.messageId.value, "key1") + } + } yield () + } + } + + test("tryPollMany should return empty list when no messages are available") { + withQueue { queue => + queue.tryPollMany(10).map { envelope => + assertEquals(envelope.payload, List.empty[String]) + } + } + } + + test("tryPollMany should return multiple messages") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + _ <- queue.offerOrUpdate("key3", "payload3", scheduleAt) + envelope <- queue.tryPollMany(5) + _ <- IO { + assertEquals(envelope.payload.length, 3) + assertEquals( + envelope.payload.toSet, + Set("payload1", "payload2", "payload3"), + "tryPollMany should return all three messages" + ) + } + } yield () + } + } + + test("offerBatch should handle multiple messages") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + messages = List( + BatchedMessage( + "input1", + ScheduledMessage("key1", "payload1", scheduleAt, canUpdate = true) + ), + BatchedMessage( + "input2", + ScheduledMessage("key2", "payload2", scheduleAt, canUpdate = true) + ) + ) + replies <- queue.offerBatch(messages) + _ <- IO { + assertEquals(replies.length, 2) + assertEquals(replies(0).outcome, OfferOutcome.Created) + assertEquals(replies(1).outcome, OfferOutcome.Created) + } + } yield () + } + } + + test("read should return a message without locking it") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.read("key1") + stillExists <- queue.containsMessage("key1") + _ <- IO { + assert(envelope.isDefined, "envelope should be defined") + assertEquals(envelope.get.payload, "payload1") + assert(stillExists, "message should still exist after read") + } + } yield () + } + } + + test("dropMessage should remove a message") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + dropped <- queue.dropMessage("key1") + exists <- queue.containsMessage("key1") + _ <- IO { + assert(dropped, "dropMessage should return true") + assert(!exists, "message should not exist after drop") + } + } yield () + } + } + + test("containsMessage should return true for existing message") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + exists <- queue.containsMessage("key1") + _ <- IO(assert(exists, "message should exist")) + } yield () + } + } + + test("containsMessage should return false for non-existing message") { + withQueue { queue => + queue.containsMessage("nonexistent").map { exists => + assert(!exists, "nonexistent message should not exist") + } + } + } + + test("dropAllMessages should remove all messages") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + count <- queue.dropAllMessages("Yes, please, I know what I'm doing!") + exists1 <- queue.containsMessage("key1") + exists2 <- queue.containsMessage("key2") + _ <- IO { + assertEquals(count, 2) + assert(!exists1, "key1 should not exist after dropAll") + assert(!exists2, "key2 should not exist after dropAll") + } + } yield () + } + } + + test("acknowledge should delete the message") { + withQueue { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.tryPoll + _ <- { + assert(envelope.isDefined, "envelope should be defined") + envelope.get.acknowledge + } + exists <- queue.containsMessage("key1") + _ <- IO(assert(!exists, "message should be deleted after acknowledgment")) + } yield () + } + } + + test("cron should return a CronService") { + withQueue { queue => + queue.cron.map { cronService => + assert(cronService != null, "cronService should not be null") + } + } + } + + test("getTimeConfig should return the configured time config") { + withQueue { queue => + queue.getTimeConfig.map { config => + assert(config != null, "timeConfig should not be null") + } + } + } + + test("multiple queues can share the same table") { + val tableName = s"shared_table_${System.nanoTime()}" + for { + config1 = createConfig(tableName, "queue1") + config2 = createConfig(tableName, "queue2") + _ <- DelayedQueueJDBC.runMigrations(config1) + _ <- createQueue(tableName, "queue1").use { queue1 => + createQueue(tableName, "queue2").use { queue2 => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue1.offerOrUpdate("key1", "queue1-payload", scheduleAt) + _ <- queue2.offerOrUpdate("key1", "queue2-payload", scheduleAt) + envelope1 <- queue1.tryPoll + envelope2 <- queue2.tryPoll + _ <- IO { + assert(envelope1.isDefined, "queue1 should have a message") + assert(envelope2.isDefined, "queue2 should have a message") + assertEquals(envelope1.get.payload, "queue1-payload") + assertEquals(envelope2.get.payload, "queue2-payload") + } + } yield () + } + } + } yield () + } + + test("concurrency") { + val tableName = s"concurrent_table_${System.nanoTime()}" + val config = createConfig(tableName, "concurrent-queue") + val producers = 2 + val consumers = 2 + val messageCount = 100 // Reduced for JDBC performance + val now = Instant.now() + + assert(messageCount % producers == 0, "messageCount should be divisible by number of producers") + assert(messageCount % consumers == 0, "messageCount should be divisible by number of consumers") + + def producer(queue: DelayedQueue[String], id: Int, count: Int): IO[Int] = + (1 to count).toList.traverse { i => + val key = s"producer-$id-message-$i" + val payload = s"payload-$key" + queue.offerOrUpdate(key, payload, now).map { outcome => + assertEquals(outcome, OfferOutcome.Created) + 1 + } + }.map(_.sum) + + def allProducers(queue: DelayedQueue[String]): IO[Int] = (1 to producers).toList.parTraverse { + id => + producer(queue, id, messageCount / producers) + }.map(_.sum) + + def consumer(queue: DelayedQueue[String], count: Int): IO[Int] = (1 to count).toList.traverse { + _ => + queue.poll.map { envelope => + assert( + envelope.payload.startsWith("payload-"), + "payload should have correct format" + ) + 1 + } + }.map(_.sum) + + def allConsumers(queue: DelayedQueue[String]): IO[Int] = (1 to consumers).toList.parTraverse { + _ => + consumer(queue, messageCount / consumers) + }.map(_.sum) + + for { + _ <- DelayedQueueJDBC.runMigrations(config) + _ <- createQueue(tableName, "concurrent-queue").use { queue => + val test = for { + prodFiber <- allProducers(queue).background + conFiber <- allConsumers(queue).background + } yield (prodFiber, conFiber) + + test.use { case (prodFiber, conFiber) => + for { + p <- prodFiber + p <- p.embedNever + c <- conFiber + c <- c.embedNever + } yield { + assertEquals(p, messageCount) + assertEquals(c, messageCount) + } + } + }.timeout(30.seconds) + } yield () + } +} + +/** H2 database tests for DelayedQueueJDBC. */ +class DelayedQueueJDBCH2Spec extends DelayedQueueJDBCSpec { + // Use a stable database name for the test suite + private val testDbName = s"test_h2_${System.currentTimeMillis()}" + + override def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig = { + val dbConfig = JdbcConnectionConfig( + url = s"jdbc:h2:mem:$testDbName;DB_CLOSE_DELAY=-1", + driver = JdbcDriver.H2, + username = Some("sa"), + password = Some("") + ) + DelayedQueueJDBCConfig.create(dbConfig, tableName, queueName) + } +} + +/** HSQLDB database tests for DelayedQueueJDBC. */ +class DelayedQueueJDBCHSQLDBSpec extends DelayedQueueJDBCSpec { + private val testDbName = s"test_hsqldb_${System.currentTimeMillis()}" + + override def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig = { + val dbConfig = JdbcConnectionConfig( + url = s"jdbc:hsqldb:mem:$testDbName", + driver = JdbcDriver.HSQLDB, + username = Some("SA"), + password = Some("") + ) + DelayedQueueJDBCConfig.create(dbConfig, tableName, queueName) + } +} diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala index 055299e..b416e1f 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala @@ -17,7 +17,6 @@ package org.funfix.delayedqueue.scala import java.time.Duration -import org.funfix.delayedqueue.jvm import org.funfix.delayedqueue.scala.RetryConfig.asScala class RetryConfigSpec extends munit.FunSuite { diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala index ffa6515..26ae911 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala @@ -18,7 +18,7 @@ package org.funfix.delayedqueue.scala import java.time.Instant import org.funfix.delayedqueue.scala.ScheduledMessage.asScala -import org.funfix.delayedqueue.scala.OfferOutcome.asScala as offerAsScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala class ScheduledMessageSpec extends munit.FunSuite { @@ -63,7 +63,7 @@ class ScheduledMessageSpec extends munit.FunSuite { val outcomes = List(OfferOutcome.Created, OfferOutcome.Updated, OfferOutcome.Ignored) outcomes.foreach { outcome => - val roundtripped = outcome.asJava.offerAsScala + val roundtripped = outcome.asJava.asScala assertEquals(roundtripped, outcome) } }