Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ inThisBuild(
scalacOptions ++= Seq(
"-no-indent",
"-Yexplicit-nulls",
"-Werror",
),
// ---
// Settings for dealing with the local Gradle-assembled artifacts
Expand Down Expand Up @@ -77,5 +78,9 @@ lazy val delayedqueueJVM = project
"org.typelevel" %% "cats-effect-testkit" % "3.6.3" % Test,
"org.scalacheck" %% "scalacheck" % "1.19.0" % Test,
"org.scalameta" %% "munit-scalacheck" % "1.2.0" % Test,
// JDBC drivers for testing
"com.h2database" % "h2" % "2.4.240" % Test,
"org.hsqldb" % "hsqldb" % "2.7.4" % Test,
"org.xerial" % "sqlite-jdbc" % "3.51.1.0" % Test,
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@
package org.funfix.delayedqueue.scala

import cats.effect.{IO, Resource, Clock}
import cats.syntax.functor.*
import java.time.{Clock as JavaClock, Instant}
import org.funfix.delayedqueue.jvm
import org.funfix.delayedqueue.scala.AckEnvelope.asScala
import org.funfix.delayedqueue.scala.OfferOutcome.asScala
import org.funfix.delayedqueue.scala.BatchedReply.asScala
import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala
import scala.jdk.CollectionConverters.*
import cats.effect.std.Dispatcher

/** In-memory implementation of [[DelayedQueue]] using concurrent data structures.
Expand Down Expand Up @@ -92,137 +86,12 @@ object DelayedQueueInMemory {
ackEnvSource,
javaClock
)
new DelayedQueueInMemoryWrapper(jvmQueue)
new DelayedQueueWrapper(jvmQueue)
}
}

/** Wrapper that implements the Scala DelayedQueue trait by delegating to the JVM implementation.
*/
private class DelayedQueueInMemoryWrapper[A](
underlying: jvm.DelayedQueueInMemory[A]
) extends DelayedQueue[A] {

override def getTimeConfig: IO[DelayedQueueTimeConfig] =
IO(underlying.getTimeConfig.asScala)

override def offerOrUpdate(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] =
IO(underlying.offerOrUpdate(key, payload, scheduleAt).asScala)

override def offerIfNotExists(key: String, payload: A, scheduleAt: Instant): IO[OfferOutcome] =
IO(underlying.offerIfNotExists(key, payload, scheduleAt).asScala)

override def offerBatch[In](
messages: List[BatchedMessage[In, A]]
): IO[List[BatchedReply[In, A]]] =
IO {
val javaMessages = messages.map(_.asJava).asJava
val javaReplies = underlying.offerBatch(javaMessages)
javaReplies.asScala.toList.map(_.asScala)
}

override def tryPoll: IO[Option[AckEnvelope[A]]] =
IO {
Option(underlying.tryPoll()).map(_.asScala)
}

override def tryPollMany(batchMaxSize: Int): IO[AckEnvelope[List[A]]] =
IO {
val javaEnvelope = underlying.tryPollMany(batchMaxSize)
AckEnvelope(
payload = javaEnvelope.payload.asScala.toList,
messageId = MessageId.asScala(javaEnvelope.messageId),
timestamp = javaEnvelope.timestamp,
source = javaEnvelope.source,
deliveryType = DeliveryType.asScala(javaEnvelope.deliveryType),
acknowledge = IO.blocking(javaEnvelope.acknowledge())
)
}

override def poll: IO[AckEnvelope[A]] =
IO.interruptible(underlying.poll().asScala)

override def read(key: String): IO[Option[AckEnvelope[A]]] =
IO {
Option(underlying.read(key)).map(_.asScala)
}

override def dropMessage(key: String): IO[Boolean] =
IO(underlying.dropMessage(key))

override def containsMessage(key: String): IO[Boolean] =
IO(underlying.containsMessage(key))

override def dropAllMessages(confirm: String): IO[Int] =
IO(underlying.dropAllMessages(confirm))

override def cron: IO[CronService[A]] =
IO(new CronServiceWrapper(underlying.getCron))
}

/** Wrapper for CronService that delegates to the JVM implementation. */
private class CronServiceWrapper[A](
underlying: jvm.CronService[A]
) extends CronService[A] {

override def installTick(
configHash: CronConfigHash,
keyPrefix: String,
messages: List[CronMessage[A]]
): IO[Unit] =
IO {
val javaMessages = messages.map(_.asJava).asJava
underlying.installTick(configHash.asJava, keyPrefix, javaMessages)
}

override def uninstallTick(configHash: CronConfigHash, keyPrefix: String): IO[Unit] =
IO {
underlying.uninstallTick(configHash.asJava, keyPrefix)
}

override def install(
configHash: CronConfigHash,
keyPrefix: String,
scheduleInterval: java.time.Duration,
generateMany: (Instant) => List[CronMessage[A]]
): Resource[IO, Unit] =
Resource.fromAutoCloseable(IO {
underlying.install(
configHash.asJava,
keyPrefix,
scheduleInterval,
now => generateMany(now).map(_.asJava).asJava
)
}).void

override def installDailySchedule(
keyPrefix: String,
schedule: CronDailySchedule,
generator: (Instant) => CronMessage[A]
): Resource[IO, Unit] =
Resource.fromAutoCloseable(IO {
underlying.installDailySchedule(
keyPrefix,
schedule.asJava,
now => generator(now).asJava
)
}).void

override def installPeriodicTick(
keyPrefix: String,
period: java.time.Duration,
generator: (Instant) => A
): Resource[IO, Unit] =
Resource.fromAutoCloseable(IO {
underlying.installPeriodicTick(
keyPrefix,
period,
now => generator(now)
)
}).void
}
}

private final class CatsClockToJavaClock(
private[scala] final class CatsClockToJavaClock(
dispatcher: Dispatcher[IO],
zone: java.time.ZoneId = java.time.ZoneId.systemDefault()
)(using Clock[IO]) extends JavaClock {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2026 Alexandru Nedelcu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.funfix.delayedqueue.scala

import cats.effect.{IO, Resource}
import cats.effect.std.Dispatcher
import org.funfix.delayedqueue.jvm

/** JDBC-based implementation of [[DelayedQueue]] with support for multiple database backends.
*
* This implementation wraps the JVM [[org.funfix.delayedqueue.jvm.DelayedQueueJDBC]] and provides
* an idiomatic Scala API with Cats Effect IO for managing side effects.
*
* ==Example==
*
* {{{
* import cats.effect.IO
* import java.time.Instant
*
* val dbConfig = JdbcConnectionConfig(
* url = "jdbc:h2:mem:testdb",
* driver = JdbcDriver.H2,
* username = Some("sa"),
* password = Some("")
* )
*
* val config = DelayedQueueJDBCConfig.create(
* db = dbConfig,
* tableName = "delayed_queue",
* queueName = "my-queue"
* )
*
* // Run migrations first (once per database)
* DelayedQueueJDBC.runMigrations(config).unsafeRunSync()
*
* // Create and use the queue (using implicit PayloadCodec.forStrings)
* DelayedQueueJDBC[String](config).use { queue =>
* for {
* _ <- queue.offerOrUpdate("key1", "Hello", Instant.now().plusSeconds(10))
* envelope <- queue.poll
* _ <- IO.println(s"Received: $${envelope.payload}")
* _ <- envelope.acknowledge
* } yield ()
* }
* }}}
*
* @tparam A
* the type of message payloads
*/
object DelayedQueueJDBC {

/** Creates a JDBC-based delayed queue with the given configuration.
*
* @tparam A
* the type of message payloads
* @param config
* JDBC queue configuration
* @param codec
* implicit payload codec for serialization (e.g., PayloadCodec.forStrings for String)
* @return
* a Resource that manages the queue lifecycle
*/
def apply[A](
config: DelayedQueueJDBCConfig
)(using codec: PayloadCodec[A]): Resource[IO, DelayedQueue[A]] =
Dispatcher.sequential[IO].flatMap { dispatcher =>
Resource.fromAutoCloseable(IO {
val javaClock = CatsClockToJavaClock(dispatcher)
jvm.DelayedQueueJDBC.create[A](
codec.asJava,
config.asJava,
javaClock
)
}).map(jvmQueue => new DelayedQueueWrapper(jvmQueue))
}

/** Runs database migrations for the queue.
*
* This should be called once per database before creating queue instances.
*
* @param config
* JDBC queue configuration
* @return
* IO action that runs the migrations
*/
def runMigrations(
config: DelayedQueueJDBCConfig
): IO[Unit] =
IO.interruptible {
jvm.DelayedQueueJDBC.runMigrations(
config.asJava
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2026 Alexandru Nedelcu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.funfix.delayedqueue.scala

import org.funfix.delayedqueue.jvm
import scala.util.control.NonFatal

/** Type-class for encoding and decoding message payloads to/from binary data.
*
* This is used by JDBC implementations to store message payloads in the database.
*
* @tparam A
* the type of message payloads
*/
trait PayloadCodec[A] {

/** Returns the fully-qualified type name of the messages this codec handles.
*
* This is used for queue partitioning and message routing.
*
* @return
* the fully-qualified type name (e.g., "java.lang.String")
*/
def typeName: String

/** Serializes a payload to a byte array.
*
* @param payload
* the payload to serialize
* @return
* the serialized byte representation
*/
def serialize(payload: A): Array[Byte]

/** Deserializes a payload from a byte array.
*
* @param serialized
* the serialized bytes
* @return
* Either the deserialized payload or an IllegalArgumentException if parsing fails
*/
def deserialize(serialized: Array[Byte]): Either[IllegalArgumentException, A]

/** Converts this Scala PayloadCodec to a JVM MessageSerializer. */
def asJava: jvm.MessageSerializer[A] =
new jvm.MessageSerializer[A] {
override def getTypeName(): String = PayloadCodec.this.typeName
override def serialize(payload: A): Array[Byte] = PayloadCodec.this.serialize(payload)
override def deserialize(serialized: Array[Byte]): A =
PayloadCodec.this.deserialize(serialized) match {
case Right(value) => value
case Left(error) => throw error
}
}
}

object PayloadCodec {

/** Given instance for String payloads using UTF-8 encoding.
*
* This is based on the JVM MessageSerializer.forStrings implementation.
*/
given forStrings: PayloadCodec[String] =
fromJava(jvm.MessageSerializer.forStrings())

/** Wraps a JVM MessageSerializer to provide a Scala PayloadCodec interface. */
def fromJava[A](javaSerializer: jvm.MessageSerializer[A]): PayloadCodec[A] =
new PayloadCodec[A] {
override def typeName: String = javaSerializer.getTypeName()
override def serialize(payload: A): Array[Byte] = javaSerializer.serialize(payload)
override def deserialize(serialized: Array[Byte]): Either[IllegalArgumentException, A] =
try {
Right(javaSerializer.deserialize(serialized))
} catch {
case e: IllegalArgumentException => Left(e)
case NonFatal(e) => Left(new IllegalArgumentException(e.getMessage, e))
}
}
}
Loading