From 62ce234170e453705f1c3dce949f37f32585f1c8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 9 Feb 2026 14:02:37 +0000 Subject: [PATCH 1/7] Initial plan From 6ad9fdbe929640f98f4381c43c1656f42ab5cf02 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 9 Feb 2026 14:06:03 +0000 Subject: [PATCH 2/7] Add DelayedQueueJDBC Scala wrapper and tests Co-authored-by: alexandru <11753+alexandru@users.noreply.github.com> --- .../delayedqueue/scala/DelayedQueueJDBC.scala | 243 +++++++++++++ .../scala/MessageSerializer.scala | 87 +++++ .../scala/DelayedQueueJDBCSpec.scala | 320 ++++++++++++++++++ 3 files changed, 650 insertions(+) create mode 100644 delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala create mode 100644 delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/MessageSerializer.scala create mode 100644 delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala new file mode 100644 index 0000000..3531b99 --- /dev/null +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala @@ -0,0 +1,243 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.effect.{IO, Resource} +import java.time.Instant +import org.funfix.delayedqueue.jvm +import org.funfix.delayedqueue.scala.AckEnvelope.asScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala +import org.funfix.delayedqueue.scala.BatchedReply.asScala +import scala.jdk.CollectionConverters.* + +/** JDBC-based implementation of [[DelayedQueue]] with support for multiple database backends. + * + * This implementation wraps the JVM [[org.funfix.delayedqueue.jvm.DelayedQueueJDBC]] and provides + * an idiomatic Scala API with Cats Effect IO for managing side effects. + * + * ==Example== + * + * {{{ + * import cats.effect.IO + * import java.time.Instant + * + * val dbConfig = JdbcConnectionConfig( + * url = "jdbc:h2:mem:testdb", + * driver = JdbcDriver.H2, + * username = Some("sa"), + * password = Some("") + * ) + * + * val config = DelayedQueueJDBCConfig.create( + * db = dbConfig, + * tableName = "delayed_queue", + * queueName = "my-queue" + * ) + * + * // Run migrations first (once per database) + * DelayedQueueJDBC.runMigrations[String](MessageSerializer.forStrings, config).unsafeRunSync() + * + * // Create and use the queue + * DelayedQueueJDBC[String](MessageSerializer.forStrings, config).use { queue => + * for { + * _ <- queue.offerOrUpdate("key1", "Hello", Instant.now().plusSeconds(10)) + * envelope <- queue.poll + * _ <- IO.println(s"Received: $${envelope.payload}") + * _ <- envelope.acknowledge + * } yield () + * } + * }}} + * + * @tparam A + * the type of message payloads + */ +object DelayedQueueJDBC { + + /** Creates a JDBC-based delayed queue with the given configuration. + * + * @tparam A + * the type of message payloads + * @param serializer + * serializer for message payloads + * @param config + * JDBC queue configuration + * @return + * a Resource that manages the queue lifecycle + */ + def apply[A]( + serializer: MessageSerializer[A], + config: DelayedQueueJDBCConfig + ): Resource[IO, DelayedQueue[A]] = + Resource.make( + IO { + val jvmQueue = jvm.DelayedQueueJDBC.create( + serializer.asJava, + config.asJava + ) + new DelayedQueueJDBCWrapper(jvmQueue) + } + )(queue => IO(queue.underlying.close())) + + /** Runs database migrations for the queue. + * + * This should be called once per database before creating queue instances. + * + * @tparam A + * the type of message payloads + * @param serializer + * serializer for message payloads + * @param config + * JDBC queue configuration + * @return + * IO action that runs the migrations + */ + def runMigrations[A]( + serializer: MessageSerializer[A], + config: DelayedQueueJDBCConfig + ): IO[Unit] = + IO { + jvm.DelayedQueueJDBC.runMigrations( + serializer.asJava, + config.asJava + ) + } + + /** Wrapper that implements the Scala DelayedQueue trait by delegating to the JVM implementation. + */ + private class DelayedQueueJDBCWrapper[A]( + val underlying: jvm.DelayedQueueJDBC[A] + ) extends DelayedQueue[A] { + + override def getTimeConfig: IO[DelayedQueueTimeConfig] = + IO(underlying.getTimeConfig.asScala) + + override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) + + override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) + + override def offerBatch[In]( + messages: List[BatchedMessage[In, A]] + ): IO[List[BatchedReply[In, A]]] = + IO { + val javaMessages = messages.map(_.asJava).asJava + val javaReplies = underlying.offerBatch(javaMessages) + javaReplies.asScala.toList.map(_.asScala) + } + + override def tryPoll: IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.tryPoll()).map(_.asScala) + } + + override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = + IO { + val javaEnvelope = underlying.tryPollMany(batchMaxSize) + AckEnvelope( + payload = javaEnvelope.payload.asScala.toList, + messageId = MessageId.asScala(javaEnvelope.messageId), + timestamp = javaEnvelope.timestamp, + source = javaEnvelope.source, + deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), + acknowledge = IO.blocking(javaEnvelope.acknowledge()) + ) + } + + override def poll: IO[AckEnvelope[A]] = + IO.interruptible(underlying.poll().asScala) + + override def read(key: String): IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.read(key)).map(_.asScala) + } + + override def dropMessage(key: String): IO[Boolean] = + IO(underlying.dropMessage(key)) + + override def containsMessage(key: String): IO[Boolean] = + IO(underlying.containsMessage(key)) + + override def dropAllMessages(confirm: String): IO[Int] = + IO(underlying.dropAllMessages(confirm)) + + override def cron: IO[CronService[A]] = + IO(new CronServiceWrapper(underlying.getCron)) + } + + /** Wrapper for CronService that delegates to the JVM implementation. */ + private class CronServiceWrapper[A]( + underlying: jvm.CronService[A] + ) extends CronService[A] { + + override def installTick( + configHash: CronConfigHash, + keyPrefix: String, + messages: List[CronMessage[A]] + ): IO[Unit] = + IO { + val javaMessages = messages.map(_.asJava).asJava + underlying.installTick(configHash.asJava, keyPrefix, javaMessages) + } + + override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = + IO { + underlying.uninstallTick(configHash.asJava, keyPrefix) + } + + override def install( + configHash: CronConfigHash, + keyPrefix: String, + scheduleInterval: java.time.Duration, + generateMany: (Instant) => List[CronMessage[A]] + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.install( + configHash.asJava, + keyPrefix, + scheduleInterval, + now => generateMany(now).map(_.asJava).asJava + ) + }).void + + override def installDailySchedule( + keyPrefix: String, + schedule: CronDailySchedule, + generator: (Instant) => CronMessage[A] + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.installDailySchedule( + keyPrefix, + schedule.asJava, + now => generator(now).asJava + ) + }).void + + override def installPeriodicTick( + keyPrefix: String, + period: java.time.Duration, + generator: (Instant) => A + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.installPeriodicTick( + keyPrefix, + period, + now => generator(now) + ) + }).void + } +} diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/MessageSerializer.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/MessageSerializer.scala new file mode 100644 index 0000000..ca80a9b --- /dev/null +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/MessageSerializer.scala @@ -0,0 +1,87 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.jvm + +/** Strategy for serializing and deserializing message payloads to/from binary data. + * + * This is used by JDBC implementations to store message payloads in the database. + * + * @tparam A + * the type of message payloads + */ +trait MessageSerializer[A] { + + /** Returns the fully-qualified type name of the messages this serializer handles. + * + * This is used for queue partitioning and message routing. + * + * @return + * the fully-qualified type name (e.g., "java.lang.String") + */ + def getTypeName: String + + /** Serializes a payload to a byte array. + * + * @param payload + * the payload to serialize + * @return + * the serialized byte representation + */ + def serialize(payload: A): Array[Byte] + + /** Deserializes a payload from a byte array. + * + * @param serialized + * the serialized bytes + * @return + * the deserialized payload + * @throws IllegalArgumentException + * if the serialized string cannot be parsed + */ + @throws[IllegalArgumentException] + def deserialize(serialized: Array[Byte]): A + + /** Converts this Scala MessageSerializer to a JVM MessageSerializer. */ + def asJava: jvm.MessageSerializer[A] = + new jvm.MessageSerializer[A] { + override def getTypeName(): String = MessageSerializer.this.getTypeName + override def serialize(payload: A): Array[Byte] = MessageSerializer.this.serialize(payload) + override def deserialize(serialized: Array[Byte]): A = + MessageSerializer.this.deserialize(serialized) + } +} + +object MessageSerializer { + + /** Creates a serializer for String payloads (UTF-8 encoding). */ + def forStrings: MessageSerializer[String] = + new MessageSerializer[String] { + override def getTypeName: String = "java.lang.String" + override def serialize(payload: String): Array[Byte] = payload.getBytes("UTF-8") + override def deserialize(serialized: Array[Byte]): String = new String(serialized, "UTF-8") + } + + /** Wraps a JVM MessageSerializer to provide a Scala interface. */ + def fromJava[A](javaSerializer: jvm.MessageSerializer[A]): MessageSerializer[A] = + new MessageSerializer[A] { + override def getTypeName: String = javaSerializer.getTypeName() + override def serialize(payload: A): Array[Byte] = javaSerializer.serialize(payload) + override def deserialize(serialized: Array[Byte]): A = javaSerializer.deserialize(serialized) + } +} diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala new file mode 100644 index 0000000..b11ecb2 --- /dev/null +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala @@ -0,0 +1,320 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.syntax.all.* +import cats.effect.IO +import java.time.Instant +import munit.CatsEffectSuite +import scala.concurrent.duration.* + +/** Base test suite for DelayedQueueJDBC with common test cases. */ +abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { + + /** Create a queue configuration for testing. Subclasses override this. */ + def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig + + /** Helper to create a queue with default settings. */ + def createQueue(tableName: String = "delayed_queue", queueName: String = "test-queue") = + DelayedQueueJDBC[String](MessageSerializer.forStrings, createConfig(tableName, queueName)) + + /** Helper to run migrations and create a queue. */ + def withQueue( + tableName: String = "delayed_queue", + queueName: String = "test-queue" + )(test: DelayedQueue[String] => IO[Unit]): IO[Unit] = { + val config = createConfig(tableName, queueName) + for { + _ <- DelayedQueueJDBC.runMigrations(MessageSerializer.forStrings, config) + _ <- createQueue(tableName, queueName).use(test) + } yield () + } + + test("offerOrUpdate should create a new message") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + result <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + } yield assertEquals(result, OfferOutcome.Created) + } + } + + test("offerOrUpdate should update an existing message") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + result <- queue.offerOrUpdate("key1", "payload2", scheduleAt.plusSeconds(5)) + } yield assertEquals(result, OfferOutcome.Updated) + } + } + + test("offerIfNotExists should create a new message") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + result <- queue.offerIfNotExists("key1", "payload1", scheduleAt) + } yield assertEquals(result, OfferOutcome.Created) + } + } + + test("offerIfNotExists should ignore existing message") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerIfNotExists("key1", "payload1", scheduleAt) + result <- queue.offerIfNotExists("key1", "payload2", scheduleAt.plusSeconds(5)) + } yield assertEquals(result, OfferOutcome.Ignored) + } + } + + test("tryPoll should return None when no messages are available") { + withQueue() { queue => + queue.tryPoll.assertEquals(None) + } + } + + test("tryPoll should return a message when scheduled time has passed") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.tryPoll + _ <- IO { + assert(envelope.isDefined) + assertEquals(envelope.get.payload, "payload1") + assertEquals(envelope.get.messageId.value, "key1") + } + } yield () + } + } + + test("tryPollMany should return empty list when no messages are available") { + withQueue() { queue => + queue.tryPollMany(10).map { envelope => + assertEquals(envelope.payload, List.empty[String]) + } + } + } + + test("tryPollMany should return multiple messages") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + _ <- queue.offerOrUpdate("key3", "payload3", scheduleAt) + envelope <- queue.tryPollMany(5) + _ <- IO { + assertEquals(envelope.payload.length, 3) + assertEquals( + envelope.payload.toSet, + Set("payload1", "payload2", "payload3"), + "tryPollMany should return all three messages" + ) + } + } yield () + } + } + + test("offerBatch should handle multiple messages") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + messages = List( + BatchedMessage( + "input1", + ScheduledMessage("key1", "payload1", scheduleAt, canUpdate = true) + ), + BatchedMessage( + "input2", + ScheduledMessage("key2", "payload2", scheduleAt, canUpdate = true) + ) + ) + replies <- queue.offerBatch(messages) + _ <- IO { + assertEquals(replies.length, 2) + assertEquals(replies(0).outcome, OfferOutcome.Created) + assertEquals(replies(1).outcome, OfferOutcome.Created) + } + } yield () + } + } + + test("read should return a message without locking it") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.read("key1") + stillExists <- queue.containsMessage("key1") + _ <- IO { + assert(envelope.isDefined, "envelope should be defined") + assertEquals(envelope.get.payload, "payload1") + assert(stillExists, "message should still exist after read") + } + } yield () + } + } + + test("dropMessage should remove a message") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + dropped <- queue.dropMessage("key1") + exists <- queue.containsMessage("key1") + _ <- IO { + assert(dropped, "dropMessage should return true") + assert(!exists, "message should not exist after drop") + } + } yield () + } + } + + test("containsMessage should return true for existing message") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + exists <- queue.containsMessage("key1") + _ <- IO(assert(exists, "message should exist")) + } yield () + } + } + + test("containsMessage should return false for non-existing message") { + withQueue() { queue => + queue.containsMessage("nonexistent").map { exists => + assert(!exists, "nonexistent message should not exist") + } + } + } + + test("dropAllMessages should remove all messages") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().plusSeconds(10)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + _ <- queue.offerOrUpdate("key2", "payload2", scheduleAt) + count <- queue.dropAllMessages("Yes, please, I know what I'm doing!") + exists1 <- queue.containsMessage("key1") + exists2 <- queue.containsMessage("key2") + _ <- IO { + assertEquals(count, 2) + assert(!exists1, "key1 should not exist after dropAll") + assert(!exists2, "key2 should not exist after dropAll") + } + } yield () + } + } + + test("acknowledge should delete the message") { + withQueue() { queue => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) + envelope <- queue.tryPoll + _ <- { + assert(envelope.isDefined, "envelope should be defined") + envelope.get.acknowledge + } + exists <- queue.containsMessage("key1") + _ <- IO(assert(!exists, "message should be deleted after acknowledgment")) + } yield () + } + } + + test("cron should return a CronService") { + withQueue() { queue => + queue.cron.map { cronService => + assert(cronService != null, "cronService should not be null") + } + } + } + + test("getTimeConfig should return the configured time config") { + withQueue() { queue => + queue.getTimeConfig.map { config => + assert(config != null, "timeConfig should not be null") + } + } + } + + test("multiple queues can share the same table") { + val tableName = "shared_table" + for { + config1 = createConfig(tableName, "queue1") + config2 = createConfig(tableName, "queue2") + _ <- DelayedQueueJDBC.runMigrations(MessageSerializer.forStrings, config1) + _ <- createQueue(tableName, "queue1").use { queue1 => + createQueue(tableName, "queue2").use { queue2 => + for { + scheduleAt <- IO(Instant.now().minusSeconds(1)) + _ <- queue1.offerOrUpdate("key1", "queue1-payload", scheduleAt) + _ <- queue2.offerOrUpdate("key1", "queue2-payload", scheduleAt) + envelope1 <- queue1.tryPoll + envelope2 <- queue2.tryPoll + _ <- IO { + assert(envelope1.isDefined, "queue1 should have a message") + assert(envelope2.isDefined, "queue2 should have a message") + assertEquals(envelope1.get.payload, "queue1-payload") + assertEquals(envelope2.get.payload, "queue2-payload") + } + } yield () + } + } + } yield () + } +} + +/** H2 database tests for DelayedQueueJDBC. */ +class DelayedQueueJDBCH2Spec extends DelayedQueueJDBCSpec { + override def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig = { + val dbConfig = JdbcConnectionConfig( + url = s"jdbc:h2:mem:test_${java.util.UUID.randomUUID()};DB_CLOSE_DELAY=-1", + driver = JdbcDriver.H2, + username = Some("sa"), + password = Some("") + ) + DelayedQueueJDBCConfig.create(dbConfig, tableName, queueName) + } +} + +/** SQLite database tests for DelayedQueueJDBC. */ +class DelayedQueueJDBCSQLiteSpec extends DelayedQueueJDBCSpec { + override def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig = { + val dbConfig = JdbcConnectionConfig( + url = s"jdbc:sqlite::memory:", + driver = JdbcDriver.SQLite + ) + DelayedQueueJDBCConfig.create(dbConfig, tableName, queueName) + } +} + +/** HSQLDB database tests for DelayedQueueJDBC. */ +class DelayedQueueJDBCHSQLDBSpec extends DelayedQueueJDBCSpec { + override def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig = { + val dbConfig = JdbcConnectionConfig( + url = s"jdbc:hsqldb:mem:test_${java.util.UUID.randomUUID()}", + driver = JdbcDriver.HSQLDB, + username = Some("SA"), + password = Some("") + ) + DelayedQueueJDBCConfig.create(dbConfig, tableName, queueName) + } +} From 93b5278c0bf67bead18ada30afaeebeacdf10e34 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 9 Feb 2026 14:18:45 +0000 Subject: [PATCH 3/7] Fix compilation and test issues, format code - Added missing imports (cats.syntax.functor.*) - Fixed runMigrations signature (doesn't need serializer) - Added JDBC driver test dependencies to build.sbt - Fixed test isolation by using unique table names - Fixed database configuration for H2 and HSQLDB - Formatted code with scalafmt - All H2 and HSQLDB tests passing (36/36) Co-authored-by: alexandru <11753+alexandru@users.noreply.github.com> --- build.sbt | 4 ++ .../delayedqueue/scala/DelayedQueueJDBC.scala | 14 ++-- .../scala/DelayedQueueJDBCSpec.scala | 67 +++++++++++-------- 3 files changed, 47 insertions(+), 38 deletions(-) diff --git a/build.sbt b/build.sbt index 40590b3..f65e01e 100644 --- a/build.sbt +++ b/build.sbt @@ -77,5 +77,9 @@ lazy val delayedqueueJVM = project "org.typelevel" %% "cats-effect-testkit" % "3.6.3" % Test, "org.scalacheck" %% "scalacheck" % "1.19.0" % Test, "org.scalameta" %% "munit-scalacheck" % "1.2.0" % Test, + // JDBC drivers for testing + "com.h2database" % "h2" % "2.4.240" % Test, + "org.hsqldb" % "hsqldb" % "2.7.4" % Test, + "org.xerial" % "sqlite-jdbc" % "3.51.1.0" % Test, ) ) diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala index 3531b99..87d0f52 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala @@ -17,11 +17,13 @@ package org.funfix.delayedqueue.scala import cats.effect.{IO, Resource} +import cats.syntax.functor.* import java.time.Instant import org.funfix.delayedqueue.jvm import org.funfix.delayedqueue.scala.AckEnvelope.asScala import org.funfix.delayedqueue.scala.OfferOutcome.asScala import org.funfix.delayedqueue.scala.BatchedReply.asScala +import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala import scala.jdk.CollectionConverters.* /** JDBC-based implementation of [[DelayedQueue]] with support for multiple database backends. @@ -49,7 +51,7 @@ import scala.jdk.CollectionConverters.* * ) * * // Run migrations first (once per database) - * DelayedQueueJDBC.runMigrations[String](MessageSerializer.forStrings, config).unsafeRunSync() + * DelayedQueueJDBC.runMigrations(config).unsafeRunSync() * * // Create and use the queue * DelayedQueueJDBC[String](MessageSerializer.forStrings, config).use { queue => @@ -84,7 +86,7 @@ object DelayedQueueJDBC { ): Resource[IO, DelayedQueue[A]] = Resource.make( IO { - val jvmQueue = jvm.DelayedQueueJDBC.create( + val jvmQueue = jvm.DelayedQueueJDBC.create[A]( serializer.asJava, config.asJava ) @@ -96,22 +98,16 @@ object DelayedQueueJDBC { * * This should be called once per database before creating queue instances. * - * @tparam A - * the type of message payloads - * @param serializer - * serializer for message payloads * @param config * JDBC queue configuration * @return * IO action that runs the migrations */ - def runMigrations[A]( - serializer: MessageSerializer[A], + def runMigrations( config: DelayedQueueJDBCConfig ): IO[Unit] = IO { jvm.DelayedQueueJDBC.runMigrations( - serializer.asJava, config.asJava ) } diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala index b11ecb2..2952965 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala @@ -32,20 +32,21 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { def createQueue(tableName: String = "delayed_queue", queueName: String = "test-queue") = DelayedQueueJDBC[String](MessageSerializer.forStrings, createConfig(tableName, queueName)) - /** Helper to run migrations and create a queue. */ - def withQueue( - tableName: String = "delayed_queue", - queueName: String = "test-queue" - )(test: DelayedQueue[String] => IO[Unit]): IO[Unit] = { + /** Helper to run migrations and create a queue with unique table names to ensure test isolation. + */ + def withQueue(test: DelayedQueue[String] => IO[Unit]): IO[Unit] = { + // Use a unique table name for each test to avoid interference + val tableName = s"test_table_${System.nanoTime()}" + val queueName = "test-queue" val config = createConfig(tableName, queueName) for { - _ <- DelayedQueueJDBC.runMigrations(MessageSerializer.forStrings, config) + _ <- DelayedQueueJDBC.runMigrations(config) _ <- createQueue(tableName, queueName).use(test) } yield () } test("offerOrUpdate should create a new message") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().plusSeconds(10)) result <- queue.offerOrUpdate("key1", "payload1", scheduleAt) @@ -54,7 +55,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("offerOrUpdate should update an existing message") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().plusSeconds(10)) _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) @@ -64,7 +65,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("offerIfNotExists should create a new message") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().plusSeconds(10)) result <- queue.offerIfNotExists("key1", "payload1", scheduleAt) @@ -73,7 +74,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("offerIfNotExists should ignore existing message") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().plusSeconds(10)) _ <- queue.offerIfNotExists("key1", "payload1", scheduleAt) @@ -83,13 +84,13 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("tryPoll should return None when no messages are available") { - withQueue() { queue => + withQueue { queue => queue.tryPoll.assertEquals(None) } } test("tryPoll should return a message when scheduled time has passed") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().minusSeconds(1)) _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) @@ -104,7 +105,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("tryPollMany should return empty list when no messages are available") { - withQueue() { queue => + withQueue { queue => queue.tryPollMany(10).map { envelope => assertEquals(envelope.payload, List.empty[String]) } @@ -112,7 +113,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("tryPollMany should return multiple messages") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().minusSeconds(1)) _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) @@ -132,7 +133,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("offerBatch should handle multiple messages") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().plusSeconds(10)) messages = List( @@ -156,7 +157,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("read should return a message without locking it") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().plusSeconds(10)) _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) @@ -172,7 +173,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("dropMessage should remove a message") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().plusSeconds(10)) _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) @@ -187,7 +188,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("containsMessage should return true for existing message") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().plusSeconds(10)) _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) @@ -198,7 +199,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("containsMessage should return false for non-existing message") { - withQueue() { queue => + withQueue { queue => queue.containsMessage("nonexistent").map { exists => assert(!exists, "nonexistent message should not exist") } @@ -206,7 +207,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("dropAllMessages should remove all messages") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().plusSeconds(10)) _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) @@ -224,7 +225,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("acknowledge should delete the message") { - withQueue() { queue => + withQueue { queue => for { scheduleAt <- IO(Instant.now().minusSeconds(1)) _ <- queue.offerOrUpdate("key1", "payload1", scheduleAt) @@ -240,7 +241,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("cron should return a CronService") { - withQueue() { queue => + withQueue { queue => queue.cron.map { cronService => assert(cronService != null, "cronService should not be null") } @@ -248,7 +249,7 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("getTimeConfig should return the configured time config") { - withQueue() { queue => + withQueue { queue => queue.getTimeConfig.map { config => assert(config != null, "timeConfig should not be null") } @@ -256,11 +257,11 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } test("multiple queues can share the same table") { - val tableName = "shared_table" + val tableName = s"shared_table_${System.nanoTime()}" for { config1 = createConfig(tableName, "queue1") config2 = createConfig(tableName, "queue2") - _ <- DelayedQueueJDBC.runMigrations(MessageSerializer.forStrings, config1) + _ <- DelayedQueueJDBC.runMigrations(config1) _ <- createQueue(tableName, "queue1").use { queue1 => createQueue(tableName, "queue2").use { queue2 => for { @@ -284,9 +285,12 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { /** H2 database tests for DelayedQueueJDBC. */ class DelayedQueueJDBCH2Spec extends DelayedQueueJDBCSpec { + // Use a stable database name for the test suite + private val testDbName = s"test_h2_${System.currentTimeMillis()}" + override def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig = { val dbConfig = JdbcConnectionConfig( - url = s"jdbc:h2:mem:test_${java.util.UUID.randomUUID()};DB_CLOSE_DELAY=-1", + url = s"jdbc:h2:mem:$testDbName;DB_CLOSE_DELAY=-1", driver = JdbcDriver.H2, username = Some("sa"), password = Some("") @@ -297,10 +301,13 @@ class DelayedQueueJDBCH2Spec extends DelayedQueueJDBCSpec { /** SQLite database tests for DelayedQueueJDBC. */ class DelayedQueueJDBCSQLiteSpec extends DelayedQueueJDBCSpec { + // SQLite needs special mode for shared cache in-memory database + private val testDbName = s"test_sqlite_${System.currentTimeMillis()}" + override def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig = { val dbConfig = JdbcConnectionConfig( - url = s"jdbc:sqlite::memory:", - driver = JdbcDriver.SQLite + url = s"jdbc:sqlite:file:$testDbName?mode=memory&cache=shared", + driver = JdbcDriver.Sqlite ) DelayedQueueJDBCConfig.create(dbConfig, tableName, queueName) } @@ -308,9 +315,11 @@ class DelayedQueueJDBCSQLiteSpec extends DelayedQueueJDBCSpec { /** HSQLDB database tests for DelayedQueueJDBC. */ class DelayedQueueJDBCHSQLDBSpec extends DelayedQueueJDBCSpec { + private val testDbName = s"test_hsqldb_${System.currentTimeMillis()}" + override def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig = { val dbConfig = JdbcConnectionConfig( - url = s"jdbc:hsqldb:mem:test_${java.util.UUID.randomUUID()}", + url = s"jdbc:hsqldb:mem:$testDbName", driver = JdbcDriver.HSQLDB, username = Some("SA"), password = Some("") From e77efdba7ec2e9d1993fed097df537a4fc4226a9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 9 Feb 2026 14:47:30 +0000 Subject: [PATCH 4/7] Refactor to use PayloadCodec type-class and shared wrappers - Renamed MessageSerializer to PayloadCodec as a proper type-class - Changed deserialize to return Either[IllegalArgumentException, A] - Made forStrings a given instance based on JVM MessageSerializer.forStrings - Extracted shared DelayedQueueWrapper and CronServiceWrapper to internal.scala - Updated DelayedQueueJDBC to use implicit PayloadCodec parameter - Use Resource.fromAutoCloseable instead of Resource.make - Inject JavaClock like in DelayedQueueInMemory using Dispatcher - Use IO.interruptible for runMigrations - Added concurrency test (reduced to 100 messages for JDBC performance) - All 38 H2 and HSQLDB tests passing Co-authored-by: alexandru <11753+alexandru@users.noreply.github.com> --- .../scala/DelayedQueueInMemory.scala | 129 +------------- .../delayedqueue/scala/DelayedQueueJDBC.scala | 164 ++---------------- ...ageSerializer.scala => PayloadCodec.scala} | 54 +++--- .../funfix/delayedqueue/scala/internal.scala | 157 +++++++++++++++++ .../scala/DelayedQueueJDBCSpec.scala | 70 +++++++- 5 files changed, 275 insertions(+), 299 deletions(-) rename delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/{MessageSerializer.scala => PayloadCodec.scala} (53%) create mode 100644 delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala index 87f46c3..3fee738 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala @@ -92,137 +92,12 @@ object DelayedQueueInMemory { ackEnvSource, javaClock ) - new DelayedQueueInMemoryWrapper(jvmQueue) + new internal.DelayedQueueWrapper(jvmQueue) } } - - /** Wrapper that implements the Scala DelayedQueue trait by delegating to the JVM implementation. - */ - private class DelayedQueueInMemoryWrapper[A]( - underlying: jvm.DelayedQueueInMemory[A] - ) extends DelayedQueue[A] { - - override def getTimeConfig: IO[DelayedQueueTimeConfig] = - IO(underlying.getTimeConfig.asScala) - - override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = - IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) - - override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = - IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) - - override def offerBatch[In]( - messages: List[BatchedMessage[In, A]] - ): IO[List[BatchedReply[In, A]]] = - IO { - val javaMessages = messages.map(_.asJava).asJava - val javaReplies = underlying.offerBatch(javaMessages) - javaReplies.asScala.toList.map(_.asScala) - } - - override def tryPoll: IO[Option[AckEnvelope[A]]] = - IO { - Option(underlying.tryPoll()).map(_.asScala) - } - - override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = - IO { - val javaEnvelope = underlying.tryPollMany(batchMaxSize) - AckEnvelope( - payload = javaEnvelope.payload.asScala.toList, - messageId = MessageId.asScala(javaEnvelope.messageId), - timestamp = javaEnvelope.timestamp, - source = javaEnvelope.source, - deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), - acknowledge = IO.blocking(javaEnvelope.acknowledge()) - ) - } - - override def poll: IO[AckEnvelope[A]] = - IO.interruptible(underlying.poll().asScala) - - override def read(key: String): IO[Option[AckEnvelope[A]]] = - IO { - Option(underlying.read(key)).map(_.asScala) - } - - override def dropMessage(key: String): IO[Boolean] = - IO(underlying.dropMessage(key)) - - override def containsMessage(key: String): IO[Boolean] = - IO(underlying.containsMessage(key)) - - override def dropAllMessages(confirm: String): IO[Int] = - IO(underlying.dropAllMessages(confirm)) - - override def cron: IO[CronService[A]] = - IO(new CronServiceWrapper(underlying.getCron)) - } - - /** Wrapper for CronService that delegates to the JVM implementation. */ - private class CronServiceWrapper[A]( - underlying: jvm.CronService[A] - ) extends CronService[A] { - - override def installTick( - configHash: CronConfigHash, - keyPrefix: String, - messages: List[CronMessage[A]] - ): IO[Unit] = - IO { - val javaMessages = messages.map(_.asJava).asJava - underlying.installTick(configHash.asJava, keyPrefix, javaMessages) - } - - override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = - IO { - underlying.uninstallTick(configHash.asJava, keyPrefix) - } - - override def install( - configHash: CronConfigHash, - keyPrefix: String, - scheduleInterval: java.time.Duration, - generateMany: (Instant) => List[CronMessage[A]] - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.install( - configHash.asJava, - keyPrefix, - scheduleInterval, - now => generateMany(now).map(_.asJava).asJava - ) - }).void - - override def installDailySchedule( - keyPrefix: String, - schedule: CronDailySchedule, - generator: (Instant) => CronMessage[A] - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.installDailySchedule( - keyPrefix, - schedule.asJava, - now => generator(now).asJava - ) - }).void - - override def installPeriodicTick( - keyPrefix: String, - period: java.time.Duration, - generator: (Instant) => A - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.installPeriodicTick( - keyPrefix, - period, - now => generator(now) - ) - }).void - } } -private final class CatsClockToJavaClock( +private[scala] final class CatsClockToJavaClock( dispatcher: Dispatcher[IO], zone: java.time.ZoneId = java.time.ZoneId.systemDefault() )(using Clock[IO]) extends JavaClock { diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala index 87d0f52..48b22f5 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala @@ -16,15 +16,10 @@ package org.funfix.delayedqueue.scala -import cats.effect.{IO, Resource} +import cats.effect.{IO, Resource, Clock} import cats.syntax.functor.* -import java.time.Instant +import cats.effect.std.Dispatcher import org.funfix.delayedqueue.jvm -import org.funfix.delayedqueue.scala.AckEnvelope.asScala -import org.funfix.delayedqueue.scala.OfferOutcome.asScala -import org.funfix.delayedqueue.scala.BatchedReply.asScala -import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala -import scala.jdk.CollectionConverters.* /** JDBC-based implementation of [[DelayedQueue]] with support for multiple database backends. * @@ -53,8 +48,8 @@ import scala.jdk.CollectionConverters.* * // Run migrations first (once per database) * DelayedQueueJDBC.runMigrations(config).unsafeRunSync() * - * // Create and use the queue - * DelayedQueueJDBC[String](MessageSerializer.forStrings, config).use { queue => + * // Create and use the queue (using implicit PayloadCodec.forStrings) + * DelayedQueueJDBC[String](config).use { queue => * for { * _ <- queue.offerOrUpdate("key1", "Hello", Instant.now().plusSeconds(10)) * envelope <- queue.poll @@ -73,26 +68,26 @@ object DelayedQueueJDBC { * * @tparam A * the type of message payloads - * @param serializer - * serializer for message payloads * @param config * JDBC queue configuration + * @param codec + * implicit payload codec for serialization (e.g., PayloadCodec.forStrings for String) * @return * a Resource that manages the queue lifecycle */ def apply[A]( - serializer: MessageSerializer[A], config: DelayedQueueJDBCConfig - ): Resource[IO, DelayedQueue[A]] = - Resource.make( - IO { - val jvmQueue = jvm.DelayedQueueJDBC.create[A]( - serializer.asJava, - config.asJava + )(using codec: PayloadCodec[A]): Resource[IO, DelayedQueue[A]] = + Dispatcher.sequential[IO].flatMap { dispatcher => + Resource.fromAutoCloseable(IO { + val javaClock = CatsClockToJavaClock(dispatcher) + jvm.DelayedQueueJDBC.create[A]( + codec.asJava, + config.asJava, + javaClock ) - new DelayedQueueJDBCWrapper(jvmQueue) - } - )(queue => IO(queue.underlying.close())) + }).map(jvmQueue => new internal.DelayedQueueWrapper(jvmQueue)) + } /** Runs database migrations for the queue. * @@ -106,134 +101,9 @@ object DelayedQueueJDBC { def runMigrations( config: DelayedQueueJDBCConfig ): IO[Unit] = - IO { + IO.interruptible { jvm.DelayedQueueJDBC.runMigrations( config.asJava ) } - - /** Wrapper that implements the Scala DelayedQueue trait by delegating to the JVM implementation. - */ - private class DelayedQueueJDBCWrapper[A]( - val underlying: jvm.DelayedQueueJDBC[A] - ) extends DelayedQueue[A] { - - override def getTimeConfig: IO[DelayedQueueTimeConfig] = - IO(underlying.getTimeConfig.asScala) - - override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = - IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) - - override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = - IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) - - override def offerBatch[In]( - messages: List[BatchedMessage[In, A]] - ): IO[List[BatchedReply[In, A]]] = - IO { - val javaMessages = messages.map(_.asJava).asJava - val javaReplies = underlying.offerBatch(javaMessages) - javaReplies.asScala.toList.map(_.asScala) - } - - override def tryPoll: IO[Option[AckEnvelope[A]]] = - IO { - Option(underlying.tryPoll()).map(_.asScala) - } - - override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = - IO { - val javaEnvelope = underlying.tryPollMany(batchMaxSize) - AckEnvelope( - payload = javaEnvelope.payload.asScala.toList, - messageId = MessageId.asScala(javaEnvelope.messageId), - timestamp = javaEnvelope.timestamp, - source = javaEnvelope.source, - deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), - acknowledge = IO.blocking(javaEnvelope.acknowledge()) - ) - } - - override def poll: IO[AckEnvelope[A]] = - IO.interruptible(underlying.poll().asScala) - - override def read(key: String): IO[Option[AckEnvelope[A]]] = - IO { - Option(underlying.read(key)).map(_.asScala) - } - - override def dropMessage(key: String): IO[Boolean] = - IO(underlying.dropMessage(key)) - - override def containsMessage(key: String): IO[Boolean] = - IO(underlying.containsMessage(key)) - - override def dropAllMessages(confirm: String): IO[Int] = - IO(underlying.dropAllMessages(confirm)) - - override def cron: IO[CronService[A]] = - IO(new CronServiceWrapper(underlying.getCron)) - } - - /** Wrapper for CronService that delegates to the JVM implementation. */ - private class CronServiceWrapper[A]( - underlying: jvm.CronService[A] - ) extends CronService[A] { - - override def installTick( - configHash: CronConfigHash, - keyPrefix: String, - messages: List[CronMessage[A]] - ): IO[Unit] = - IO { - val javaMessages = messages.map(_.asJava).asJava - underlying.installTick(configHash.asJava, keyPrefix, javaMessages) - } - - override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = - IO { - underlying.uninstallTick(configHash.asJava, keyPrefix) - } - - override def install( - configHash: CronConfigHash, - keyPrefix: String, - scheduleInterval: java.time.Duration, - generateMany: (Instant) => List[CronMessage[A]] - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.install( - configHash.asJava, - keyPrefix, - scheduleInterval, - now => generateMany(now).map(_.asJava).asJava - ) - }).void - - override def installDailySchedule( - keyPrefix: String, - schedule: CronDailySchedule, - generator: (Instant) => CronMessage[A] - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.installDailySchedule( - keyPrefix, - schedule.asJava, - now => generator(now).asJava - ) - }).void - - override def installPeriodicTick( - keyPrefix: String, - period: java.time.Duration, - generator: (Instant) => A - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.installPeriodicTick( - keyPrefix, - period, - now => generator(now) - ) - }).void - } } diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/MessageSerializer.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/PayloadCodec.scala similarity index 53% rename from delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/MessageSerializer.scala rename to delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/PayloadCodec.scala index ca80a9b..1525cd3 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/MessageSerializer.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/PayloadCodec.scala @@ -17,17 +17,18 @@ package org.funfix.delayedqueue.scala import org.funfix.delayedqueue.jvm +import scala.util.control.NonFatal -/** Strategy for serializing and deserializing message payloads to/from binary data. +/** Type-class for encoding and decoding message payloads to/from binary data. * * This is used by JDBC implementations to store message payloads in the database. * * @tparam A * the type of message payloads */ -trait MessageSerializer[A] { +trait PayloadCodec[A] { - /** Returns the fully-qualified type name of the messages this serializer handles. + /** Returns the fully-qualified type name of the messages this codec handles. * * This is used for queue partitioning and message routing. * @@ -50,38 +51,43 @@ trait MessageSerializer[A] { * @param serialized * the serialized bytes * @return - * the deserialized payload - * @throws IllegalArgumentException - * if the serialized string cannot be parsed + * Either the deserialized payload or an IllegalArgumentException if parsing fails */ - @throws[IllegalArgumentException] - def deserialize(serialized: Array[Byte]): A + def deserialize(serialized: Array[Byte]): Either[IllegalArgumentException, A] - /** Converts this Scala MessageSerializer to a JVM MessageSerializer. */ + /** Converts this Scala PayloadCodec to a JVM MessageSerializer. */ def asJava: jvm.MessageSerializer[A] = new jvm.MessageSerializer[A] { - override def getTypeName(): String = MessageSerializer.this.getTypeName - override def serialize(payload: A): Array[Byte] = MessageSerializer.this.serialize(payload) + override def getTypeName(): String = PayloadCodec.this.getTypeName + override def serialize(payload: A): Array[Byte] = PayloadCodec.this.serialize(payload) override def deserialize(serialized: Array[Byte]): A = - MessageSerializer.this.deserialize(serialized) + PayloadCodec.this.deserialize(serialized) match { + case Right(value) => value + case Left(error) => throw error + } } } -object MessageSerializer { +object PayloadCodec { - /** Creates a serializer for String payloads (UTF-8 encoding). */ - def forStrings: MessageSerializer[String] = - new MessageSerializer[String] { - override def getTypeName: String = "java.lang.String" - override def serialize(payload: String): Array[Byte] = payload.getBytes("UTF-8") - override def deserialize(serialized: Array[Byte]): String = new String(serialized, "UTF-8") - } + /** Given instance for String payloads using UTF-8 encoding. + * + * This is based on the JVM MessageSerializer.forStrings implementation. + */ + given forStrings: PayloadCodec[String] = + fromJava(jvm.MessageSerializer.forStrings()) - /** Wraps a JVM MessageSerializer to provide a Scala interface. */ - def fromJava[A](javaSerializer: jvm.MessageSerializer[A]): MessageSerializer[A] = - new MessageSerializer[A] { + /** Wraps a JVM MessageSerializer to provide a Scala PayloadCodec interface. */ + def fromJava[A](javaSerializer: jvm.MessageSerializer[A]): PayloadCodec[A] = + new PayloadCodec[A] { override def getTypeName: String = javaSerializer.getTypeName() override def serialize(payload: A): Array[Byte] = javaSerializer.serialize(payload) - override def deserialize(serialized: Array[Byte]): A = javaSerializer.deserialize(serialized) + override def deserialize(serialized: Array[Byte]): Either[IllegalArgumentException, A] = + try { + Right(javaSerializer.deserialize(serialized)) + } catch { + case e: IllegalArgumentException => Left(e) + case NonFatal(e) => Left(new IllegalArgumentException(e.getMessage, e)) + } } } diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala new file mode 100644 index 0000000..c5b4570 --- /dev/null +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala @@ -0,0 +1,157 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.effect.{IO, Resource} +import cats.syntax.functor.* +import java.time.Instant +import org.funfix.delayedqueue.jvm +import org.funfix.delayedqueue.scala.AckEnvelope.asScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala +import org.funfix.delayedqueue.scala.BatchedReply.asScala +import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala +import scala.jdk.CollectionConverters.* + +/** Internal wrappers shared between DelayedQueueInMemory and DelayedQueueJDBC implementations. */ +private[scala] object internal { + + /** Wrapper that implements the Scala DelayedQueue trait by delegating to a JVM DelayedQueue + * implementation. + */ + private[scala] class DelayedQueueWrapper[A]( + underlying: jvm.DelayedQueue[A] + ) extends DelayedQueue[A] { + + override def getTimeConfig: IO[DelayedQueueTimeConfig] = + IO(underlying.getTimeConfig.asScala) + + override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) + + override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) + + override def offerBatch[In]( + messages: List[BatchedMessage[In, A]] + ): IO[List[BatchedReply[In, A]]] = + IO { + val javaMessages = messages.map(_.asJava).asJava + val javaReplies = underlying.offerBatch(javaMessages) + javaReplies.asScala.toList.map(_.asScala) + } + + override def tryPoll: IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.tryPoll()).map(_.asScala) + } + + override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = + IO { + val javaEnvelope = underlying.tryPollMany(batchMaxSize) + AckEnvelope( + payload = javaEnvelope.payload.asScala.toList, + messageId = MessageId.asScala(javaEnvelope.messageId), + timestamp = javaEnvelope.timestamp, + source = javaEnvelope.source, + deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), + acknowledge = IO.blocking(javaEnvelope.acknowledge()) + ) + } + + override def poll: IO[AckEnvelope[A]] = + IO.interruptible(underlying.poll().asScala) + + override def read(key: String): IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.read(key)).map(_.asScala) + } + + override def dropMessage(key: String): IO[Boolean] = + IO(underlying.dropMessage(key)) + + override def containsMessage(key: String): IO[Boolean] = + IO(underlying.containsMessage(key)) + + override def dropAllMessages(confirm: String): IO[Int] = + IO(underlying.dropAllMessages(confirm)) + + override def cron: IO[CronService[A]] = + IO(new CronServiceWrapper(underlying.getCron)) + } + + /** Wrapper for CronService that delegates to the JVM implementation. */ + private[scala] class CronServiceWrapper[A]( + underlying: jvm.CronService[A] + ) extends CronService[A] { + + override def installTick( + configHash: CronConfigHash, + keyPrefix: String, + messages: List[CronMessage[A]] + ): IO[Unit] = + IO { + val javaMessages = messages.map(_.asJava).asJava + underlying.installTick(configHash.asJava, keyPrefix, javaMessages) + } + + override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = + IO { + underlying.uninstallTick(configHash.asJava, keyPrefix) + } + + override def install( + configHash: CronConfigHash, + keyPrefix: String, + scheduleInterval: java.time.Duration, + generateMany: (Instant) => List[CronMessage[A]] + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.install( + configHash.asJava, + keyPrefix, + scheduleInterval, + now => generateMany(now).map(_.asJava).asJava + ) + }).void + + override def installDailySchedule( + keyPrefix: String, + schedule: CronDailySchedule, + generator: (Instant) => CronMessage[A] + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.installDailySchedule( + keyPrefix, + schedule.asJava, + now => generator(now).asJava + ) + }).void + + override def installPeriodicTick( + keyPrefix: String, + period: java.time.Duration, + generator: (Instant) => A + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.installPeriodicTick( + keyPrefix, + period, + now => generator(now) + ) + }).void + } +} diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala index 2952965..fdde768 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala @@ -25,12 +25,15 @@ import scala.concurrent.duration.* /** Base test suite for DelayedQueueJDBC with common test cases. */ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { + // Import the given PayloadCodec for String + import PayloadCodec.given + /** Create a queue configuration for testing. Subclasses override this. */ def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig /** Helper to create a queue with default settings. */ def createQueue(tableName: String = "delayed_queue", queueName: String = "test-queue") = - DelayedQueueJDBC[String](MessageSerializer.forStrings, createConfig(tableName, queueName)) + DelayedQueueJDBC[String](createConfig(tableName, queueName)) /** Helper to run migrations and create a queue with unique table names to ensure test isolation. */ @@ -281,6 +284,71 @@ abstract class DelayedQueueJDBCSpec extends CatsEffectSuite { } } yield () } + + test("concurrency") { + val tableName = s"concurrent_table_${System.nanoTime()}" + val config = createConfig(tableName, "concurrent-queue") + val producers = 2 + val consumers = 2 + val messageCount = 100 // Reduced for JDBC performance + val now = Instant.now() + + assert(messageCount % producers == 0, "messageCount should be divisible by number of producers") + assert(messageCount % consumers == 0, "messageCount should be divisible by number of consumers") + + def producer(queue: DelayedQueue[String], id: Int, count: Int): IO[Int] = + (1 to count).toList.traverse { i => + val key = s"producer-$id-message-$i" + val payload = s"payload-$key" + queue.offerOrUpdate(key, payload, now).map { outcome => + assertEquals(outcome, OfferOutcome.Created) + 1 + } + }.map(_.sum) + + def allProducers(queue: DelayedQueue[String]): IO[Int] = (1 to producers).toList.parTraverse { + id => + producer(queue, id, messageCount / producers) + }.map(_.sum) + + def consumer(queue: DelayedQueue[String], count: Int): IO[Int] = (1 to count).toList.traverse { + _ => + queue.poll.map { envelope => + assert( + envelope.payload.startsWith("payload-"), + "payload should have correct format" + ) + 1 + } + }.map(_.sum) + + def allConsumers(queue: DelayedQueue[String]): IO[Int] = (1 to consumers).toList.parTraverse { + _ => + consumer(queue, messageCount / consumers) + }.map(_.sum) + + for { + _ <- DelayedQueueJDBC.runMigrations(config) + _ <- createQueue(tableName, "concurrent-queue").use { queue => + val test = for { + prodFiber <- allProducers(queue).background + conFiber <- allConsumers(queue).background + } yield (prodFiber, conFiber) + + test.use { case (prodFiber, conFiber) => + for { + p <- prodFiber + p <- p.embedNever + c <- conFiber + c <- c.embedNever + } yield { + assertEquals(p, messageCount) + assertEquals(c, messageCount) + } + } + }.timeout(30.seconds) + } yield () + } } /** H2 database tests for DelayedQueueJDBC. */ From 5fbdb4f01216225800d55889d9c8d86e339b4a16 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 9 Feb 2026 15:28:05 +0000 Subject: [PATCH 5/7] Fix compilation warnings and refactor internal wrappers - Moved wrappers from object internal to top-level private[scala] classes - Renamed getTypeName to typeName (remove Java-ism) - Removed all unused imports in DelayedQueueInMemory and DelayedQueueJDBC - All compilation warnings fixed - All 38 H2/HSQLDB tests passing Co-authored-by: alexandru <11753+alexandru@users.noreply.github.com> --- .../scala/DelayedQueueInMemory.scala | 8 +- .../delayedqueue/scala/DelayedQueueJDBC.scala | 5 +- .../delayedqueue/scala/PayloadCodec.scala | 6 +- .../funfix/delayedqueue/scala/internal.scala | 252 +++++++++--------- 4 files changed, 130 insertions(+), 141 deletions(-) diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala index 3fee738..3fc449b 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueInMemory.scala @@ -17,14 +17,8 @@ package org.funfix.delayedqueue.scala import cats.effect.{IO, Resource, Clock} -import cats.syntax.functor.* import java.time.{Clock as JavaClock, Instant} import org.funfix.delayedqueue.jvm -import org.funfix.delayedqueue.scala.AckEnvelope.asScala -import org.funfix.delayedqueue.scala.OfferOutcome.asScala -import org.funfix.delayedqueue.scala.BatchedReply.asScala -import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala -import scala.jdk.CollectionConverters.* import cats.effect.std.Dispatcher /** In-memory implementation of [[DelayedQueue]] using concurrent data structures. @@ -92,7 +86,7 @@ object DelayedQueueInMemory { ackEnvSource, javaClock ) - new internal.DelayedQueueWrapper(jvmQueue) + new DelayedQueueWrapper(jvmQueue) } } } diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala index 48b22f5..7e880bb 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBC.scala @@ -16,8 +16,7 @@ package org.funfix.delayedqueue.scala -import cats.effect.{IO, Resource, Clock} -import cats.syntax.functor.* +import cats.effect.{IO, Resource} import cats.effect.std.Dispatcher import org.funfix.delayedqueue.jvm @@ -86,7 +85,7 @@ object DelayedQueueJDBC { config.asJava, javaClock ) - }).map(jvmQueue => new internal.DelayedQueueWrapper(jvmQueue)) + }).map(jvmQueue => new DelayedQueueWrapper(jvmQueue)) } /** Runs database migrations for the queue. diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/PayloadCodec.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/PayloadCodec.scala index 1525cd3..4eabada 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/PayloadCodec.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/PayloadCodec.scala @@ -35,7 +35,7 @@ trait PayloadCodec[A] { * @return * the fully-qualified type name (e.g., "java.lang.String") */ - def getTypeName: String + def typeName: String /** Serializes a payload to a byte array. * @@ -58,7 +58,7 @@ trait PayloadCodec[A] { /** Converts this Scala PayloadCodec to a JVM MessageSerializer. */ def asJava: jvm.MessageSerializer[A] = new jvm.MessageSerializer[A] { - override def getTypeName(): String = PayloadCodec.this.getTypeName + override def getTypeName(): String = PayloadCodec.this.typeName override def serialize(payload: A): Array[Byte] = PayloadCodec.this.serialize(payload) override def deserialize(serialized: Array[Byte]): A = PayloadCodec.this.deserialize(serialized) match { @@ -80,7 +80,7 @@ object PayloadCodec { /** Wraps a JVM MessageSerializer to provide a Scala PayloadCodec interface. */ def fromJava[A](javaSerializer: jvm.MessageSerializer[A]): PayloadCodec[A] = new PayloadCodec[A] { - override def getTypeName: String = javaSerializer.getTypeName() + override def typeName: String = javaSerializer.getTypeName() override def serialize(payload: A): Array[Byte] = javaSerializer.serialize(payload) override def deserialize(serialized: Array[Byte]): Either[IllegalArgumentException, A] = try { diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala index c5b4570..ba79bf5 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/internal.scala @@ -26,132 +26,128 @@ import org.funfix.delayedqueue.scala.BatchedReply.asScala import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala import scala.jdk.CollectionConverters.* -/** Internal wrappers shared between DelayedQueueInMemory and DelayedQueueJDBC implementations. */ -private[scala] object internal { - - /** Wrapper that implements the Scala DelayedQueue trait by delegating to a JVM DelayedQueue - * implementation. - */ - private[scala] class DelayedQueueWrapper[A]( - underlying: jvm.DelayedQueue[A] - ) extends DelayedQueue[A] { - - override def getTimeConfig: IO[DelayedQueueTimeConfig] = - IO(underlying.getTimeConfig.asScala) - - override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = - IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) - - override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = - IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) - - override def offerBatch[In]( - messages: List[BatchedMessage[In, A]] - ): IO[List[BatchedReply[In, A]]] = - IO { - val javaMessages = messages.map(_.asJava).asJava - val javaReplies = underlying.offerBatch(javaMessages) - javaReplies.asScala.toList.map(_.asScala) - } - - override def tryPoll: IO[Option[AckEnvelope[A]]] = - IO { - Option(underlying.tryPoll()).map(_.asScala) - } - - override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = - IO { - val javaEnvelope = underlying.tryPollMany(batchMaxSize) - AckEnvelope( - payload = javaEnvelope.payload.asScala.toList, - messageId = MessageId.asScala(javaEnvelope.messageId), - timestamp = javaEnvelope.timestamp, - source = javaEnvelope.source, - deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), - acknowledge = IO.blocking(javaEnvelope.acknowledge()) - ) - } - - override def poll: IO[AckEnvelope[A]] = - IO.interruptible(underlying.poll().asScala) - - override def read(key: String): IO[Option[AckEnvelope[A]]] = - IO { - Option(underlying.read(key)).map(_.asScala) - } - - override def dropMessage(key: String): IO[Boolean] = - IO(underlying.dropMessage(key)) - - override def containsMessage(key: String): IO[Boolean] = - IO(underlying.containsMessage(key)) - - override def dropAllMessages(confirm: String): IO[Int] = - IO(underlying.dropAllMessages(confirm)) - - override def cron: IO[CronService[A]] = - IO(new CronServiceWrapper(underlying.getCron)) - } - - /** Wrapper for CronService that delegates to the JVM implementation. */ - private[scala] class CronServiceWrapper[A]( - underlying: jvm.CronService[A] - ) extends CronService[A] { - - override def installTick( - configHash: CronConfigHash, - keyPrefix: String, - messages: List[CronMessage[A]] - ): IO[Unit] = - IO { - val javaMessages = messages.map(_.asJava).asJava - underlying.installTick(configHash.asJava, keyPrefix, javaMessages) - } - - override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = - IO { - underlying.uninstallTick(configHash.asJava, keyPrefix) - } - - override def install( - configHash: CronConfigHash, - keyPrefix: String, - scheduleInterval: java.time.Duration, - generateMany: (Instant) => List[CronMessage[A]] - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.install( - configHash.asJava, - keyPrefix, - scheduleInterval, - now => generateMany(now).map(_.asJava).asJava - ) - }).void - - override def installDailySchedule( - keyPrefix: String, - schedule: CronDailySchedule, - generator: (Instant) => CronMessage[A] - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.installDailySchedule( - keyPrefix, - schedule.asJava, - now => generator(now).asJava - ) - }).void - - override def installPeriodicTick( - keyPrefix: String, - period: java.time.Duration, - generator: (Instant) => A - ): Resource[IO, Unit] = - Resource.fromAutoCloseable(IO { - underlying.installPeriodicTick( - keyPrefix, - period, - now => generator(now) - ) - }).void - } +/** Wrapper that implements the Scala DelayedQueue trait by delegating to a JVM DelayedQueue + * implementation. + */ +private[scala] class DelayedQueueWrapper[A]( + underlying: jvm.DelayedQueue[A] +) extends DelayedQueue[A] { + + override def getTimeConfig: IO[DelayedQueueTimeConfig] = + IO(underlying.getTimeConfig.asScala) + + override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala) + + override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] = + IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala) + + override def offerBatch[In]( + messages: List[BatchedMessage[In, A]] + ): IO[List[BatchedReply[In, A]]] = + IO { + val javaMessages = messages.map(_.asJava).asJava + val javaReplies = underlying.offerBatch(javaMessages) + javaReplies.asScala.toList.map(_.asScala) + } + + override def tryPoll: IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.tryPoll()).map(_.asScala) + } + + override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] = + IO { + val javaEnvelope = underlying.tryPollMany(batchMaxSize) + AckEnvelope( + payload = javaEnvelope.payload.asScala.toList, + messageId = MessageId.asScala(javaEnvelope.messageId), + timestamp = javaEnvelope.timestamp, + source = javaEnvelope.source, + deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType), + acknowledge = IO.blocking(javaEnvelope.acknowledge()) + ) + } + + override def poll: IO[AckEnvelope[A]] = + IO.interruptible(underlying.poll().asScala) + + override def read(key: String): IO[Option[AckEnvelope[A]]] = + IO { + Option(underlying.read(key)).map(_.asScala) + } + + override def dropMessage(key: String): IO[Boolean] = + IO(underlying.dropMessage(key)) + + override def containsMessage(key: String): IO[Boolean] = + IO(underlying.containsMessage(key)) + + override def dropAllMessages(confirm: String): IO[Int] = + IO(underlying.dropAllMessages(confirm)) + + override def cron: IO[CronService[A]] = + IO(new CronServiceWrapper(underlying.getCron)) +} + +/** Wrapper for CronService that delegates to the JVM implementation. */ +private[scala] class CronServiceWrapper[A]( + underlying: jvm.CronService[A] +) extends CronService[A] { + + override def installTick( + configHash: CronConfigHash, + keyPrefix: String, + messages: List[CronMessage[A]] + ): IO[Unit] = + IO { + val javaMessages = messages.map(_.asJava).asJava + underlying.installTick(configHash.asJava, keyPrefix, javaMessages) + } + + override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] = + IO { + underlying.uninstallTick(configHash.asJava, keyPrefix) + } + + override def install( + configHash: CronConfigHash, + keyPrefix: String, + scheduleInterval: java.time.Duration, + generateMany: (Instant) => List[CronMessage[A]] + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.install( + configHash.asJava, + keyPrefix, + scheduleInterval, + now => generateMany(now).map(_.asJava).asJava + ) + }).void + + override def installDailySchedule( + keyPrefix: String, + schedule: CronDailySchedule, + generator: (Instant) => CronMessage[A] + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.installDailySchedule( + keyPrefix, + schedule.asJava, + now => generator(now).asJava + ) + }).void + + override def installPeriodicTick( + keyPrefix: String, + period: java.time.Duration, + generator: (Instant) => A + ): Resource[IO, Unit] = + Resource.fromAutoCloseable(IO { + underlying.installPeriodicTick( + keyPrefix, + period, + now => generator(now) + ) + }).void } From cdd98a30f90ad43c1f507db16f2f5cce994d4079 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 9 Feb 2026 18:08:38 +0200 Subject: [PATCH 6/7] Fix compilation errors --- build.sbt | 1 + .../funfix/delayedqueue/scala/CronSpec.scala | 3 --- .../scala/DataStructuresPropertySpec.scala | 18 +++++++++--------- .../delayedqueue/scala/RetryConfigSpec.scala | 1 - .../scala/ScheduledMessageSpec.scala | 4 ++-- 5 files changed, 12 insertions(+), 15 deletions(-) diff --git a/build.sbt b/build.sbt index f65e01e..585dbec 100644 --- a/build.sbt +++ b/build.sbt @@ -14,6 +14,7 @@ inThisBuild( scalacOptions ++= Seq( "-no-indent", "-Yexplicit-nulls", + "-Werror", ), // --- // Settings for dealing with the local Gradle-assembled artifacts diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala index ae7b7ee..7a5cbc3 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala @@ -20,9 +20,6 @@ import java.time.Instant import java.time.LocalTime import java.time.ZoneId import java.time.Duration -import org.funfix.delayedqueue.scala.CronConfigHash.asScala -import org.funfix.delayedqueue.scala.CronMessage.asScala as cronAsScala -import org.funfix.delayedqueue.scala.CronDailySchedule.asScala as scheduleAsScala class CronSpec extends munit.FunSuite { diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala index 5858825..d2ef261 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala @@ -21,17 +21,17 @@ import org.scalacheck.Prop.* import org.scalacheck.Gen import scala.concurrent.duration.FiniteDuration import java.time.{Instant, LocalTime, ZoneId} -import org.funfix.delayedqueue.scala.ScheduledMessage.asScala as scheduledAsScala -import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala as timeConfigAsScala -import org.funfix.delayedqueue.scala.MessageId.asScala as messageIdAsScala -import org.funfix.delayedqueue.scala.OfferOutcome.asScala as offerOutcomeAsScala -import org.funfix.delayedqueue.scala.Generators.{given, *} +import org.funfix.delayedqueue.scala.ScheduledMessage.asScala +import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala +import org.funfix.delayedqueue.scala.MessageId.asScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala +import org.funfix.delayedqueue.scala.Generators.given class DataStructuresPropertySpec extends ScalaCheckSuite { property("ScheduledMessage asJava/asScala roundtrip preserves data") { forAll { (key: String, payload: String, instant: Instant, canUpdate: Boolean) => val original = ScheduledMessage(key, payload, instant, canUpdate) - val roundtripped = original.asJava.scheduledAsScala + val roundtripped = original.asJava.asScala assertEquals(roundtripped.key, original.key) assertEquals(roundtripped.payload, original.payload) @@ -43,7 +43,7 @@ class DataStructuresPropertySpec extends ScalaCheckSuite { property("DelayedQueueTimeConfig asJava/asScala roundtrip preserves data") { forAll { (acquireTimeout: FiniteDuration, pollPeriod: FiniteDuration) => val original = DelayedQueueTimeConfig(acquireTimeout, pollPeriod) - val roundtripped = original.asJava.timeConfigAsScala + val roundtripped = original.asJava.asScala assertEquals(roundtripped.acquireTimeout.toMillis, original.acquireTimeout.toMillis) assertEquals(roundtripped.pollPeriod.toMillis, original.pollPeriod.toMillis) @@ -54,7 +54,7 @@ class DataStructuresPropertySpec extends ScalaCheckSuite { forAll { (value: String) => val messageId = MessageId(value) assertEquals(messageId.value, value) - assertEquals(messageId.asJava.messageIdAsScala.value, value) + assertEquals(messageId.asJava.asScala.value, value) } } @@ -117,7 +117,7 @@ class DataStructuresPropertySpec extends ScalaCheckSuite { property("OfferOutcome asJava/asScala roundtrip") { forAll(Gen.oneOf(OfferOutcome.Created, OfferOutcome.Updated, OfferOutcome.Ignored)) { outcome => - val roundtripped = outcome.asJava.offerOutcomeAsScala + val roundtripped = outcome.asJava.asScala assertEquals(roundtripped, outcome) } } diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala index 055299e..b416e1f 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala @@ -17,7 +17,6 @@ package org.funfix.delayedqueue.scala import java.time.Duration -import org.funfix.delayedqueue.jvm import org.funfix.delayedqueue.scala.RetryConfig.asScala class RetryConfigSpec extends munit.FunSuite { diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala index ffa6515..26ae911 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala @@ -18,7 +18,7 @@ package org.funfix.delayedqueue.scala import java.time.Instant import org.funfix.delayedqueue.scala.ScheduledMessage.asScala -import org.funfix.delayedqueue.scala.OfferOutcome.asScala as offerAsScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala class ScheduledMessageSpec extends munit.FunSuite { @@ -63,7 +63,7 @@ class ScheduledMessageSpec extends munit.FunSuite { val outcomes = List(OfferOutcome.Created, OfferOutcome.Updated, OfferOutcome.Ignored) outcomes.foreach { outcome => - val roundtripped = outcome.asJava.offerAsScala + val roundtripped = outcome.asJava.asScala assertEquals(roundtripped, outcome) } } From bb1e2939a9e6b4804165cf6fc763c9fd63840d6d Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 9 Feb 2026 18:21:57 +0200 Subject: [PATCH 7/7] Remove SQLite --- .../delayedqueue/scala/DelayedQueueJDBCSpec.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala index fdde768..8763018 100644 --- a/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala +++ b/delayedqueue-scala/src/test/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCSpec.scala @@ -367,20 +367,6 @@ class DelayedQueueJDBCH2Spec extends DelayedQueueJDBCSpec { } } -/** SQLite database tests for DelayedQueueJDBC. */ -class DelayedQueueJDBCSQLiteSpec extends DelayedQueueJDBCSpec { - // SQLite needs special mode for shared cache in-memory database - private val testDbName = s"test_sqlite_${System.currentTimeMillis()}" - - override def createConfig(tableName: String, queueName: String): DelayedQueueJDBCConfig = { - val dbConfig = JdbcConnectionConfig( - url = s"jdbc:sqlite:file:$testDbName?mode=memory&cache=shared", - driver = JdbcDriver.Sqlite - ) - DelayedQueueJDBCConfig.create(dbConfig, tableName, queueName) - } -} - /** HSQLDB database tests for DelayedQueueJDBC. */ class DelayedQueueJDBCHSQLDBSpec extends DelayedQueueJDBCSpec { private val testDbName = s"test_hsqldb_${System.currentTimeMillis()}"