diff --git a/build.sbt b/build.sbt index ea629b8..323d9fc 100644 --- a/build.sbt +++ b/build.sbt @@ -78,6 +78,8 @@ lazy val delayedqueue = crossProject(JVMPlatform) // Testing "org.scalameta" %% "munit" % "1.0.4" % Test, "org.typelevel" %% "munit-cats-effect" % "2.1.0" % Test, + "org.scalacheck" %% "scalacheck" % "1.19.0" % Test, + "org.scalameta" %% "munit-scalacheck" % "1.2.0" % Test, ) ) diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala new file mode 100644 index 0000000..80ebc8a --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala @@ -0,0 +1,137 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.effect.IO +import java.time.Instant +import org.funfix.delayedqueue.jvm + +/** Message envelope that includes an acknowledgment callback. + * + * This wrapper is returned when polling messages from the queue. It contains the message payload + * plus metadata and an acknowledgment function that should be called after processing completes. + * + * This type is not serializable. + * + * ==Example== + * + * {{{ + * for { + * envelope <- queue.poll + * _ <- processMessage(envelope.payload) + * _ <- envelope.acknowledge + * } yield () + * }}} + * + * @tparam A + * the type of the message payload + * @param payload + * the actual message content + * @param messageId + * unique identifier for tracking this message + * @param timestamp + * when this envelope was created (poll time) + * @param source + * identifier for the queue or source system + * @param deliveryType + * indicates whether this is the first delivery or a redelivery + * @param acknowledge + * IO action to call to acknowledge successful processing, and delete the message from the queue + */ +final case class AckEnvelope[+A]( + payload: A, + messageId: MessageId, + timestamp: Instant, + source: String, + deliveryType: DeliveryType, + acknowledge: IO[Unit] +) + +object AckEnvelope { + + /** Conversion extension for JVM AckEnvelope. */ + extension [A](javaEnv: jvm.AckEnvelope[A]) { + + /** Converts a JVM AckEnvelope to a Scala AckEnvelope. */ + def asScala: AckEnvelope[A] = + AckEnvelope( + payload = javaEnv.payload, + messageId = MessageId.asScala(javaEnv.messageId), + timestamp = javaEnv.timestamp, + source = javaEnv.source, + deliveryType = DeliveryType.asScala(javaEnv.deliveryType), + acknowledge = IO.blocking(javaEnv.acknowledge()) + ) + } +} + +/** Unique identifier for a message. */ +opaque type MessageId = String + +object MessageId { + + /** Creates a MessageId from a String value. */ + def apply(value: String): MessageId = value + + /** Conversion extension for String to MessageId. */ + extension (id: MessageId) { + + /** Gets the string value of the MessageId. */ + def value: String = id + + /** Converts this Scala MessageId to a JVM MessageId. */ + def asJava: jvm.MessageId = + new jvm.MessageId(id) + } + + /** Conversion extension for JVM MessageId. */ + extension (javaId: jvm.MessageId) { + + /** Converts a JVM MessageId to a Scala MessageId. */ + def asScala: MessageId = + MessageId(javaId.value) + } +} + +/** Indicates whether a message is being delivered for the first time or redelivered. */ +enum DeliveryType { + + /** Message is being delivered for the first time. */ + case FirstDelivery + + /** Message is being redelivered (was scheduled again after initial delivery). */ + case Redelivery + + /** Converts this Scala DeliveryType to a JVM DeliveryType. */ + def asJava: jvm.DeliveryType = this match { + case FirstDelivery => jvm.DeliveryType.FIRST_DELIVERY + case Redelivery => jvm.DeliveryType.REDELIVERY + } +} + +object DeliveryType { + + /** Conversion extension for JVM DeliveryType. */ + extension (javaType: jvm.DeliveryType) { + + /** Converts a JVM DeliveryType to a Scala DeliveryType. */ + def asScala: DeliveryType = javaType match { + case jvm.DeliveryType.FIRST_DELIVERY => DeliveryType.FirstDelivery + case jvm.DeliveryType.REDELIVERY => DeliveryType.Redelivery + } + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala new file mode 100644 index 0000000..44957a8 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.jvm + +/** Hash of a cron configuration, used to detect configuration changes. + * + * When a cron schedule is installed, this hash is used to identify messages belonging to that + * configuration. If the configuration changes, the hash will differ, allowing the system to clean + * up old scheduled messages. + */ +opaque type CronConfigHash = String + +object CronConfigHash { + + /** Creates a CronConfigHash from a String value. */ + def apply(value: String): CronConfigHash = value + + /** Conversion extension for CronConfigHash. */ + extension (hash: CronConfigHash) { + + /** Gets the string value of the CronConfigHash. */ + def value: String = hash + + /** Converts this Scala CronConfigHash to a JVM CronConfigHash. */ + def asJava: jvm.CronConfigHash = + new jvm.CronConfigHash(hash) + } + + /** Creates a ConfigHash from a daily cron schedule configuration. */ + def fromDailyCron(config: CronDailySchedule): CronConfigHash = + jvm.CronConfigHash.fromDailyCron(config.asJava).asScala + + /** Creates a ConfigHash from a periodic tick configuration. */ + def fromPeriodicTick(period: java.time.Duration): CronConfigHash = + jvm.CronConfigHash.fromPeriodicTick(period).asScala + + /** Creates a ConfigHash from an arbitrary string. */ + def fromString(text: String): CronConfigHash = + jvm.CronConfigHash.fromString(text).asScala + + /** Conversion extension for JVM CronConfigHash. */ + extension (javaHash: jvm.CronConfigHash) { + + /** Converts a JVM CronConfigHash to a Scala CronConfigHash. */ + def asScala: CronConfigHash = + CronConfigHash(javaHash.value) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala new file mode 100644 index 0000000..4849198 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala @@ -0,0 +1,100 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Duration +import java.time.Instant +import java.time.LocalTime +import java.time.ZoneId +import org.funfix.delayedqueue.jvm +import scala.jdk.CollectionConverters.* + +/** Configuration for daily recurring scheduled messages with timezone support. + * + * This class defines when messages should be scheduled each day, with support for multiple times + * per day and scheduling messages in advance. + * + * @param zoneId + * the timezone for interpreting the hours of day + * @param hoursOfDay + * the times during each day when messages should be scheduled (must not be empty) + * @param scheduleInAdvance + * how far in advance to schedule messages + * @param scheduleInterval + * how often to check and update the schedule + */ +final case class CronDailySchedule( + zoneId: ZoneId, + hoursOfDay: List[LocalTime], + scheduleInAdvance: Duration, + scheduleInterval: Duration +) { + require(hoursOfDay.nonEmpty, "hoursOfDay must not be empty") + require( + !scheduleInterval.isZero && !scheduleInterval.isNegative, + "scheduleInterval must be positive" + ) + + /** Calculates the next scheduled times starting from now. + * + * Returns all times that should be scheduled, from now until (now + scheduleInAdvance). Always + * returns at least one time (the next scheduled time), even if it's beyond scheduleInAdvance. + * + * @param now + * the current time + * @return + * list of future instants when messages should be scheduled (never empty) + */ + def getNextTimes(now: Instant): List[Instant] = { + import scala.jdk.CollectionConverters.* + asJava.getNextTimes(now).asScala.toList + } + + /** Converts this Scala CronDailySchedule to a JVM CronDailySchedule. */ + def asJava: jvm.CronDailySchedule = + new jvm.CronDailySchedule( + zoneId, + hoursOfDay.asJava, + scheduleInAdvance, + scheduleInterval + ) +} + +object CronDailySchedule { + + /** Creates a DailyCronSchedule with the specified configuration. */ + def create( + zoneId: ZoneId, + hoursOfDay: List[LocalTime], + scheduleInAdvance: Duration, + scheduleInterval: Duration + ): CronDailySchedule = + CronDailySchedule(zoneId, hoursOfDay, scheduleInAdvance, scheduleInterval) + + /** Conversion extension for JVM CronDailySchedule. */ + extension (javaSchedule: jvm.CronDailySchedule) { + + /** Converts a JVM CronDailySchedule to a Scala CronDailySchedule. */ + def asScala: CronDailySchedule = + CronDailySchedule( + zoneId = javaSchedule.zoneId, + hoursOfDay = javaSchedule.hoursOfDay.asScala.toList, + scheduleInAdvance = javaSchedule.scheduleInAdvance, + scheduleInterval = javaSchedule.scheduleInterval + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala new file mode 100644 index 0000000..987964e --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala @@ -0,0 +1,118 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Instant +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter +import java.util.Locale +import org.funfix.delayedqueue.jvm + +/** Represents a message for periodic (cron-like) scheduling. + * + * This wrapper is used for messages that should be scheduled repeatedly. The `scheduleAt` is used + * to generate the unique key, while `scheduleAtActual` allows for a different execution time + * (e.g., to add a delay). + * + * @tparam A + * the type of the message payload + * @param payload + * the message content + * @param scheduleAt + * the nominal schedule time (used for key generation) + * @param scheduleAtActual + * the actual execution time (defaults to scheduleAt if None) + */ +final case class CronMessage[+A]( + payload: A, + scheduleAt: Instant, + scheduleAtActual: Option[Instant] = None +) { + + /** Converts this CronMessage to a ScheduledMessage. + * + * @param configHash + * the configuration hash for this cron job + * @param keyPrefix + * the prefix for generating unique keys + * @param canUpdate + * whether the resulting message can update existing entries + */ + def toScheduled( + configHash: CronConfigHash, + keyPrefix: String, + canUpdate: Boolean + ): ScheduledMessage[A] = + ScheduledMessage( + key = CronMessage.key(configHash, keyPrefix, scheduleAt), + payload = payload, + scheduleAt = scheduleAtActual.getOrElse(scheduleAt), + canUpdate = canUpdate + ) + + /** Converts this Scala CronMessage to a JVM CronMessage. */ + def asJava[A1 >: A]: jvm.CronMessage[A1] = + new jvm.CronMessage[A1](payload, scheduleAt, scheduleAtActual.getOrElse(null)) +} + +object CronMessage { + + /** Generates a unique key for a cron message. + * + * @param configHash + * the configuration hash + * @param keyPrefix + * the key prefix + * @param scheduleAt + * the schedule time + * @return + * a unique key string + */ + def key(configHash: CronConfigHash, keyPrefix: String, scheduleAt: Instant): String = + jvm.CronMessage.key(configHash.asJava, keyPrefix, scheduleAt) + + /** Creates a factory function that produces CronMessages with a static payload. + * + * @param payload + * the static payload to use for all generated messages + * @return + * a function that creates CronMessages for any given instant + */ + def staticPayload[A](payload: A): CronMessageGenerator[A] = { + val jvmGenerator = jvm.CronMessage.staticPayload(payload) + (scheduleAt: Instant) => jvmGenerator.invoke(scheduleAt).asScala + } + + /** Conversion extension for JVM CronMessage. */ + extension [A](javaMsg: jvm.CronMessage[A]) { + + /** Converts a JVM CronMessage to a Scala CronMessage. */ + def asScala: CronMessage[A] = + CronMessage( + payload = javaMsg.payload, + scheduleAt = javaMsg.scheduleAt, + scheduleAtActual = Option(javaMsg.scheduleAtActual) + ) + } +} + +/** Function that generates CronMessages for given instants. */ +trait CronMessageGenerator[A] { + + /** Creates a cron message for the given instant. */ + def apply(at: Instant): CronMessage[A] +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala new file mode 100644 index 0000000..8c64d53 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala @@ -0,0 +1,126 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.jvm + +/** Configuration for JDBC-based delayed queue instances. + * + * This configuration groups together all settings needed to create a DelayedQueueJDBC instance. + * + * ==Example== + * + * {{{ + * val dbConfig = JdbcConnectionConfig( + * url = "jdbc:hsqldb:mem:testdb", + * driver = JdbcDriver.HSQLDB, + * username = "SA", + * password = "", + * pool = null + * ) + * + * val config = DelayedQueueJDBCConfig( + * db = dbConfig, + * tableName = "delayed_queue_table", + * time = DelayedQueueTimeConfig.DEFAULT_JDBC, + * queueName = "my-queue", + * ackEnvSource = "DelayedQueueJDBC:my-queue", + * retryPolicy = Some(RetryConfig.DEFAULT) + * ) + * }}} + * + * @param db + * JDBC connection configuration + * @param tableName + * Name of the database table to use for storing queue messages + * @param time + * Time configuration for queue operations (poll periods, timeouts, etc.) + * @param queueName + * Unique name for this queue instance, used for partitioning messages in shared tables. Multiple + * queue instances can share the same database table if they have different queue names. + * @param ackEnvSource + * Source identifier for acknowledgement envelopes, used for tracing and debugging. Typically, + * follows the pattern "DelayedQueueJDBC:{queueName}". + * @param retryPolicy + * Optional retry configuration for database operations. If None, uses RetryConfig.DEFAULT. + */ +final case class DelayedQueueJDBCConfig( + db: JdbcConnectionConfig, + tableName: String, + time: DelayedQueueTimeConfig, + queueName: String, + ackEnvSource: String, + retryPolicy: Option[RetryConfig] +) { + require(tableName.nonEmpty, "tableName must not be blank") + require(queueName.nonEmpty, "queueName must not be blank") + require(ackEnvSource.nonEmpty, "ackEnvSource must not be blank") + + /** Converts this Scala DelayedQueueJDBCConfig to a JVM DelayedQueueJDBCConfig. */ + def asJava: jvm.DelayedQueueJDBCConfig = + new jvm.DelayedQueueJDBCConfig( + db.asJava, + tableName, + time.asJava, + queueName, + ackEnvSource, + retryPolicy.map(_.asJava).getOrElse(null) + ) +} + +object DelayedQueueJDBCConfig { + + /** Creates a default configuration for the given database, table name, and queue name. + * + * @param db + * JDBC connection configuration + * @param tableName + * Name of the database table to use + * @param queueName + * Unique name for this queue instance + * @return + * A configuration with default time and retry policies + */ + def create( + db: JdbcConnectionConfig, + tableName: String, + queueName: String + ): DelayedQueueJDBCConfig = + DelayedQueueJDBCConfig( + db = db, + tableName = tableName, + time = DelayedQueueTimeConfig.DEFAULT_JDBC, + queueName = queueName, + ackEnvSource = s"DelayedQueueJDBC:$queueName", + retryPolicy = Some(RetryConfig.DEFAULT) + ) + + /** Conversion extension for JVM DelayedQueueJDBCConfig. */ + extension (javaConfig: jvm.DelayedQueueJDBCConfig) { + + /** Converts a JVM DelayedQueueJDBCConfig to a Scala DelayedQueueJDBCConfig. */ + def asScala: DelayedQueueJDBCConfig = + DelayedQueueJDBCConfig( + db = JdbcConnectionConfig.asScala(javaConfig.db), + tableName = javaConfig.tableName, + time = DelayedQueueTimeConfig.asScala(javaConfig.time), + queueName = javaConfig.queueName, + ackEnvSource = javaConfig.ackEnvSource, + retryPolicy = Option(javaConfig.retryPolicy).map(RetryConfig.asScala) + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala new file mode 100644 index 0000000..bb34909 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import scala.concurrent.duration.FiniteDuration +import org.funfix.delayedqueue.jvm + +/** Time configuration for delayed queue operations. + * + * @param acquireTimeout + * maximum time to wait when acquiring/locking a message for processing + * @param pollPeriod + * interval between poll attempts when no messages are available + */ +final case class DelayedQueueTimeConfig( + acquireTimeout: FiniteDuration, + pollPeriod: FiniteDuration +) { + + /** Converts this Scala DelayedQueueTimeConfig to a JVM DelayedQueueTimeConfig. */ + def asJava: jvm.DelayedQueueTimeConfig = + new jvm.DelayedQueueTimeConfig( + java.time.Duration.ofMillis(acquireTimeout.toMillis), + java.time.Duration.ofMillis(pollPeriod.toMillis) + ) +} + +object DelayedQueueTimeConfig { + + /** Default configuration for DelayedQueueInMemory. */ + val DEFAULT_IN_MEMORY: DelayedQueueTimeConfig = + jvm.DelayedQueueTimeConfig.DEFAULT_IN_MEMORY.asScala + + /** Default configuration for JDBC-based implementations, with longer acquire timeouts and poll + * periods to reduce database load in production environments. + */ + val DEFAULT_JDBC: DelayedQueueTimeConfig = + jvm.DelayedQueueTimeConfig.DEFAULT_JDBC.asScala + + /** Default configuration for testing, with shorter timeouts and poll periods to speed up tests. + */ + val DEFAULT_TESTING: DelayedQueueTimeConfig = + jvm.DelayedQueueTimeConfig.DEFAULT_TESTING.asScala + + /** Conversion extension for JVM DelayedQueueTimeConfig. */ + extension (javaConfig: jvm.DelayedQueueTimeConfig) { + + /** Converts a JVM DelayedQueueTimeConfig to a Scala DelayedQueueTimeConfig. */ + def asScala: DelayedQueueTimeConfig = + DelayedQueueTimeConfig( + acquireTimeout = + FiniteDuration( + javaConfig.acquireTimeout.toMillis, + scala.concurrent.duration.MILLISECONDS + ), + pollPeriod = + FiniteDuration(javaConfig.pollPeriod.toMillis, scala.concurrent.duration.MILLISECONDS) + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala new file mode 100644 index 0000000..817c081 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala @@ -0,0 +1,68 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.jvm + +/** Represents the configuration for a JDBC connection. + * + * @param url + * the JDBC connection URL + * @param driver + * the JDBC driver to use + * @param username + * optional username for authentication + * @param password + * optional password for authentication + * @param pool + * optional connection pool configuration + */ +final case class JdbcConnectionConfig( + url: String, + driver: JdbcDriver, + username: Option[String] = None, + password: Option[String] = None, + pool: Option[JdbcDatabasePoolConfig] = None +) { + + /** Converts this Scala JdbcConnectionConfig to a JVM JdbcConnectionConfig. */ + def asJava: jvm.JdbcConnectionConfig = + new jvm.JdbcConnectionConfig( + url, + driver.asJava, + username.getOrElse(null), + password.getOrElse(null), + pool.map(_.asJava).getOrElse(null) + ) +} + +object JdbcConnectionConfig { + + /** Conversion extension for JVM JdbcConnectionConfig. */ + extension (javaConfig: jvm.JdbcConnectionConfig) { + + /** Converts a JVM JdbcConnectionConfig to a Scala JdbcConnectionConfig. */ + def asScala: JdbcConnectionConfig = + JdbcConnectionConfig( + url = javaConfig.url, + driver = JdbcDriver.asScala(javaConfig.driver), + username = Option(javaConfig.username), + password = Option(javaConfig.password), + pool = Option(javaConfig.pool).map(JdbcDatabasePoolConfig.asScala) + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala new file mode 100644 index 0000000..1ac2a33 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Duration +import org.funfix.delayedqueue.jvm + +/** Configuration for tuning the Hikari connection pool. + * + * @param connectionTimeout + * maximum time to wait for a connection from the pool + * @param idleTimeout + * maximum time a connection can sit idle in the pool + * @param maxLifetime + * maximum lifetime of a connection in the pool + * @param keepaliveTime + * frequency of keepalive checks + * @param maximumPoolSize + * maximum number of connections in the pool + * @param minimumIdle + * minimum number of idle connections to maintain + * @param leakDetectionThreshold + * time before a connection is considered leaked + * @param initializationFailTimeout + * time to wait for pool initialization + */ +final case class JdbcDatabasePoolConfig( + connectionTimeout: Duration, + idleTimeout: Duration, + maxLifetime: Duration, + keepaliveTime: Duration, + maximumPoolSize: Int, + minimumIdle: Option[Int], + leakDetectionThreshold: Option[Duration], + initializationFailTimeout: Option[Duration] +) { + + /** Converts this Scala JdbcDatabasePoolConfig to a JVM JdbcDatabasePoolConfig. */ + def asJava: jvm.JdbcDatabasePoolConfig = + new jvm.JdbcDatabasePoolConfig( + connectionTimeout, + idleTimeout, + maxLifetime, + keepaliveTime, + maximumPoolSize, + minimumIdle.map(Int.box).getOrElse(null), + leakDetectionThreshold.getOrElse(null), + initializationFailTimeout.getOrElse(null) + ) +} + +object JdbcDatabasePoolConfig { + + /** Default connection pool configuration. */ + val DEFAULT: JdbcDatabasePoolConfig = + new jvm.JdbcDatabasePoolConfig().asScala + + /** Conversion extension for JVM JdbcDatabasePoolConfig. */ + extension (javaConfig: jvm.JdbcDatabasePoolConfig) { + + /** Converts a JVM JdbcDatabasePoolConfig to a Scala JdbcDatabasePoolConfig. */ + def asScala: JdbcDatabasePoolConfig = + JdbcDatabasePoolConfig( + connectionTimeout = javaConfig.connectionTimeout, + idleTimeout = javaConfig.idleTimeout, + maxLifetime = javaConfig.maxLifetime, + keepaliveTime = javaConfig.keepaliveTime, + maximumPoolSize = javaConfig.maximumPoolSize, + minimumIdle = Option(javaConfig.minimumIdle).map(_.intValue), + leakDetectionThreshold = Option(javaConfig.leakDetectionThreshold), + initializationFailTimeout = Option(javaConfig.initializationFailTimeout) + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala new file mode 100644 index 0000000..3fb5bb0 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala @@ -0,0 +1,78 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.jvm + +/** JDBC driver configurations. + * + * @param className + * the JDBC driver class name + */ +final case class JdbcDriver private (className: String) { + + /** Converts this Scala JdbcDriver to a JVM JdbcDriver. */ + def asJava: jvm.JdbcDriver = + JdbcDriver.jvmEntries.getOrElse( + this, + throw new IllegalArgumentException(s"Unknown JDBC driver: $className") + ) +} + +object JdbcDriver { + + val HSQLDB: JdbcDriver = jvm.JdbcDriver.HSQLDB.asScala + val H2: JdbcDriver = jvm.JdbcDriver.H2.asScala + val MsSqlServer: JdbcDriver = jvm.JdbcDriver.MsSqlServer.asScala + val Sqlite: JdbcDriver = jvm.JdbcDriver.Sqlite.asScala + val MariaDB: JdbcDriver = jvm.JdbcDriver.MariaDB.asScala + val MySQL: JdbcDriver = jvm.JdbcDriver.MySQL.asScala + val PostgreSQL: JdbcDriver = jvm.JdbcDriver.PostgreSQL.asScala + val Oracle: JdbcDriver = jvm.JdbcDriver.Oracle.asScala + + val entries: List[JdbcDriver] = + List(H2, HSQLDB, MariaDB, MsSqlServer, MySQL, PostgreSQL, Sqlite, Oracle) + + private val jvmEntries: Map[JdbcDriver, jvm.JdbcDriver] = Map( + H2 -> jvm.JdbcDriver.H2, + HSQLDB -> jvm.JdbcDriver.HSQLDB, + MariaDB -> jvm.JdbcDriver.MariaDB, + MsSqlServer -> jvm.JdbcDriver.MsSqlServer, + MySQL -> jvm.JdbcDriver.MySQL, + PostgreSQL -> jvm.JdbcDriver.PostgreSQL, + Sqlite -> jvm.JdbcDriver.Sqlite, + Oracle -> jvm.JdbcDriver.Oracle + ) + + /** Attempt to find a JdbcDriver by its class name. + * + * @param className + * the JDBC driver class name + * @return + * the JdbcDriver if found, None otherwise + */ + def fromClassName(className: String): Option[JdbcDriver] = + entries.find(_.className.equalsIgnoreCase(className)) + + /** Conversion extension for JVM JdbcDriver. */ + extension (javaDriver: jvm.JdbcDriver) { + + /** Converts a JVM JdbcDriver to a Scala JdbcDriver. */ + def asScala: JdbcDriver = + JdbcDriver(javaDriver.getClassName()) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala new file mode 100644 index 0000000..4f34a92 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.jvm + +/** Outcome of offering a message to the delayed queue. + * + * This sealed trait represents the possible results when adding or updating a message in the + * queue. + */ +sealed trait OfferOutcome { + + /** Returns true if the offer was ignored (message already exists and cannot be updated). */ + def isIgnored: Boolean = this == OfferOutcome.Ignored + + /** Converts this Scala OfferOutcome to a JVM OfferOutcome. */ + def asJava: jvm.OfferOutcome = this match { + case OfferOutcome.Created => jvm.OfferOutcome.Created.INSTANCE + case OfferOutcome.Updated => jvm.OfferOutcome.Updated.INSTANCE + case OfferOutcome.Ignored => jvm.OfferOutcome.Ignored.INSTANCE + } +} + +object OfferOutcome { + + /** Message was successfully created (new entry). */ + case object Created extends OfferOutcome + + /** Message was successfully updated (existing entry modified). */ + case object Updated extends OfferOutcome + + /** Message offer was ignored (already exists and canUpdate was false). */ + case object Ignored extends OfferOutcome + + /** Conversion extension for JVM OfferOutcome. */ + extension (javaOutcome: jvm.OfferOutcome) { + + /** Converts a JVM OfferOutcome to a Scala OfferOutcome. */ + def asScala: OfferOutcome = javaOutcome match { + case _: jvm.OfferOutcome.Created => OfferOutcome.Created + case _: jvm.OfferOutcome.Updated => OfferOutcome.Updated + case _: jvm.OfferOutcome.Ignored => OfferOutcome.Ignored + } + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala new file mode 100644 index 0000000..e967bd9 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala @@ -0,0 +1,117 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Duration +import org.funfix.delayedqueue.jvm + +/** Configuration for retry loops with exponential backoff. + * + * Used to configure retry behavior for database operations that may experience transient failures + * such as deadlocks, connection issues, or transaction rollbacks. + * + * ==Example== + * + * {{{ + * val config = RetryConfig( + * maxRetries = Some(3), + * totalSoftTimeout = Some(Duration.ofSeconds(30)), + * perTryHardTimeout = Some(Duration.ofSeconds(10)), + * initialDelay = Duration.ofMillis(100), + * maxDelay = Duration.ofSeconds(5), + * backoffFactor = 2.0 + * ) + * }}} + * + * @param initialDelay + * Initial delay before first retry + * @param maxDelay + * Maximum delay between retries (backoff is capped at this value) + * @param backoffFactor + * Multiplier for exponential backoff (e.g., 2.0 for doubling delays) + * @param maxRetries + * Maximum number of retries (None means unlimited retries) + * @param totalSoftTimeout + * Total time after which retries stop (None means no timeout) + * @param perTryHardTimeout + * Hard timeout for each individual attempt (None means no per-try timeout) + */ +final case class RetryConfig( + initialDelay: Duration, + maxDelay: Duration, + backoffFactor: Double, + maxRetries: Option[Long], + totalSoftTimeout: Option[Duration], + perTryHardTimeout: Option[Duration] +) { + require(backoffFactor >= 1.0, s"backoffFactor must be >= 1.0, got $backoffFactor") + require(!initialDelay.isNegative, s"initialDelay must not be negative, got $initialDelay") + require(!maxDelay.isNegative, s"maxDelay must not be negative, got $maxDelay") + require(maxRetries.forall(_ >= 0), s"maxRetries must be >= 0 or None, got $maxRetries") + require( + totalSoftTimeout.forall(!_.isNegative), + s"totalSoftTimeout must not be negative, got $totalSoftTimeout" + ) + require( + perTryHardTimeout.forall(!_.isNegative), + s"perTryHardTimeout must not be negative, got $perTryHardTimeout" + ) + + /** Converts this Scala RetryConfig to a JVM RetryConfig. */ + def asJava: jvm.RetryConfig = + new jvm.RetryConfig( + initialDelay, + maxDelay, + backoffFactor, + maxRetries.map(Long.box).getOrElse(null), + totalSoftTimeout.getOrElse(null), + perTryHardTimeout.getOrElse(null) + ) +} + +object RetryConfig { + + /** Default retry configuration with reasonable defaults for database operations: + * - 5 retries maximum + * - 30 second total timeout + * - 10 second per-try timeout + * - 100ms initial delay + * - 5 second max delay + * - 2.0 backoff factor (exponential doubling) + */ + val DEFAULT: RetryConfig = + jvm.RetryConfig.DEFAULT.asScala + + /** No retries - operations fail immediately on first error. */ + val NO_RETRIES: RetryConfig = + jvm.RetryConfig.NO_RETRIES.asScala + + /** Conversion extension for JVM RetryConfig. */ + extension (javaConfig: jvm.RetryConfig) { + + /** Converts a JVM RetryConfig to a Scala RetryConfig. */ + def asScala: RetryConfig = + RetryConfig( + initialDelay = javaConfig.initialDelay, + maxDelay = javaConfig.maxDelay, + backoffFactor = javaConfig.backoffFactor, + maxRetries = Option(javaConfig.maxRetries).map(_.longValue), + totalSoftTimeout = Option(javaConfig.totalSoftTimeout), + perTryHardTimeout = Option(javaConfig.perTryHardTimeout) + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala new file mode 100644 index 0000000..2679f04 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala @@ -0,0 +1,142 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Instant +import org.funfix.delayedqueue.jvm + +/** Represents a message scheduled for future delivery in the delayed queue. + * + * This is the primary data structure for messages that will be processed at a specific time in the + * future. + * + * @tparam A + * the type of the message payload + * @param key + * unique identifier for this message; can be used to update or delete the message + * @param payload + * the actual message content + * @param scheduleAt + * the timestamp when this message becomes available for polling + * @param canUpdate + * whether existing messages with the same key can be updated + */ +final case class ScheduledMessage[+A]( + key: String, + payload: A, + scheduleAt: Instant, + canUpdate: Boolean = true +) { + + /** Converts this Scala ScheduledMessage to a JVM ScheduledMessage. */ + def asJava[A1 >: A]: jvm.ScheduledMessage[A1] = + new jvm.ScheduledMessage[A1](key, payload, scheduleAt, canUpdate) +} + +object ScheduledMessage { + + /** Conversion extension for JVM ScheduledMessage. */ + extension [A](javaMsg: jvm.ScheduledMessage[A]) { + + /** Converts a JVM ScheduledMessage to a Scala ScheduledMessage. */ + def asScala: ScheduledMessage[A] = + ScheduledMessage( + key = javaMsg.key, + payload = javaMsg.payload, + scheduleAt = javaMsg.scheduleAt, + canUpdate = javaMsg.canUpdate + ) + } +} + +/** Wrapper for batched message operations, associating input metadata with scheduled messages. + * + * @tparam In + * the type of the input metadata + * @tparam A + * the type of the message payload + * @param input + * the original input metadata + * @param message + * the scheduled message + */ +final case class BatchedMessage[+In, +A]( + input: In, + message: ScheduledMessage[A] +) { + + /** Creates a reply for this batched message with the given outcome. */ + def reply(outcome: OfferOutcome): BatchedReply[In, A] = + BatchedReply(input, message, outcome) + + /** Converts this Scala BatchedMessage to a JVM BatchedMessage. */ + def asJava[In1 >: In, A1 >: A]: jvm.BatchedMessage[In1, A1] = + new jvm.BatchedMessage[In1, A1](input, message.asJava) +} + +object BatchedMessage { + + /** Conversion extension for JVM BatchedMessage. */ + extension [In, A](javaMsg: jvm.BatchedMessage[In, A]) { + + /** Converts a JVM BatchedMessage to a Scala BatchedMessage. */ + def asScala: BatchedMessage[In, A] = + BatchedMessage( + input = javaMsg.input, + message = ScheduledMessage.asScala(javaMsg.message) + ) + } +} + +/** Reply for a batched message operation, containing the outcome. + * + * @tparam In + * the type of the input metadata + * @tparam A + * the type of the message payload + * @param input + * the original input metadata + * @param message + * the scheduled message + * @param outcome + * the result of offering this message + */ +final case class BatchedReply[+In, +A]( + input: In, + message: ScheduledMessage[A], + outcome: OfferOutcome +) { + + /** Converts this Scala BatchedReply to a JVM BatchedReply. */ + def asJava[In1 >: In, A1 >: A]: jvm.BatchedReply[In1, A1] = + new jvm.BatchedReply[In1, A1](input, message.asJava, outcome.asJava) +} + +object BatchedReply { + + /** Conversion extension for JVM BatchedReply. */ + extension [In, A](javaReply: jvm.BatchedReply[In, A]) { + + /** Converts a JVM BatchedReply to a Scala BatchedReply. */ + def asScala: BatchedReply[In, A] = + BatchedReply( + input = javaReply.input, + message = ScheduledMessage.asScala(javaReply.message), + outcome = OfferOutcome.asScala(javaReply.outcome) + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala new file mode 100644 index 0000000..1dea0c0 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +/** Checked exception thrown in case of exceptions happening that are not recoverable, rendering + * DelayedQueue inaccessible. + * + * Example: issues with the RDBMS (bugs, or connection unavailable, failing after multiple retries) + */ +class ResourceUnavailableException(message: String, cause: Throwable | Null) + extends Exception(message, cause) { + + def this(message: String) = this(message, null) +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala new file mode 100644 index 0000000..ae7b7ee --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala @@ -0,0 +1,122 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Instant +import java.time.LocalTime +import java.time.ZoneId +import java.time.Duration +import org.funfix.delayedqueue.scala.CronConfigHash.asScala +import org.funfix.delayedqueue.scala.CronMessage.asScala as cronAsScala +import org.funfix.delayedqueue.scala.CronDailySchedule.asScala as scheduleAsScala + +class CronSpec extends munit.FunSuite { + + test("CronConfigHash fromString should be deterministic") { + val text = "test-config" + val hash1 = CronConfigHash.fromString(text) + val hash2 = CronConfigHash.fromString(text) + + assertEquals(hash1.value, hash2.value) + } + + test("CronConfigHash fromDailyCron should create hash") { + val schedule = CronDailySchedule( + zoneId = ZoneId.of("UTC"), + hoursOfDay = List(LocalTime.of(10, 0)), + scheduleInAdvance = Duration.ofDays(1), + scheduleInterval = Duration.ofHours(1) + ) + + val hash = CronConfigHash.fromDailyCron(schedule) + assert(hash.value.nonEmpty) + } + + test("CronConfigHash fromPeriodicTick should create hash") { + val hash = CronConfigHash.fromPeriodicTick(Duration.ofMinutes(5)) + assert(hash.value.nonEmpty) + } + + test("CronMessage key should be unique for different times") { + val hash = CronConfigHash.fromString("test") + val prefix = "test-prefix" + + val key1 = CronMessage.key(hash, prefix, Instant.ofEpochMilli(1000)) + val key2 = CronMessage.key(hash, prefix, Instant.ofEpochMilli(2000)) + + assertNotEquals(key1, key2) + } + + test("CronMessage toScheduled should create ScheduledMessage") { + val cronMsg = CronMessage( + payload = "test-payload", + scheduleAt = Instant.ofEpochMilli(1000) + ) + + val hash = CronConfigHash.fromString("test") + val scheduled = cronMsg.toScheduled(hash, "prefix", canUpdate = true) + + assertEquals(scheduled.payload, "test-payload") + assertEquals(scheduled.scheduleAt, Instant.ofEpochMilli(1000)) + assertEquals(scheduled.canUpdate, true) + } + + test("CronMessage staticPayload should create generator") { + val generator = CronMessage.staticPayload("static-payload") + val instant = Instant.ofEpochMilli(1000) + val cronMsg = generator(instant) + + assertEquals(cronMsg.payload, "static-payload") + assertEquals(cronMsg.scheduleAt, instant) + } + + test("CronDailySchedule getNextTimes should return at least one time") { + val schedule = CronDailySchedule( + zoneId = ZoneId.of("UTC"), + hoursOfDay = List(LocalTime.of(10, 0)), + scheduleInAdvance = Duration.ofDays(1), + scheduleInterval = Duration.ofHours(1) + ) + + val now = Instant.parse("2024-01-01T08:00:00Z") + val nextTimes = schedule.getNextTimes(now) + + assert(nextTimes.nonEmpty) + } + + test("CronDailySchedule should validate non-empty hours") { + intercept[IllegalArgumentException] { + CronDailySchedule( + zoneId = ZoneId.of("UTC"), + hoursOfDay = List.empty, + scheduleInAdvance = Duration.ofDays(1), + scheduleInterval = Duration.ofHours(1) + ) + } + } + + test("CronDailySchedule should validate positive schedule interval") { + intercept[IllegalArgumentException] { + CronDailySchedule( + zoneId = ZoneId.of("UTC"), + hoursOfDay = List(LocalTime.of(10, 0)), + scheduleInAdvance = Duration.ofDays(1), + scheduleInterval = Duration.ZERO + ) + } + } +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala new file mode 100644 index 0000000..5858825 --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala @@ -0,0 +1,170 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import munit.ScalaCheckSuite +import org.scalacheck.Prop.* +import org.scalacheck.Gen +import scala.concurrent.duration.FiniteDuration +import java.time.{Instant, LocalTime, ZoneId} +import org.funfix.delayedqueue.scala.ScheduledMessage.asScala as scheduledAsScala +import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala as timeConfigAsScala +import org.funfix.delayedqueue.scala.MessageId.asScala as messageIdAsScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala as offerOutcomeAsScala +import org.funfix.delayedqueue.scala.Generators.{given, *} + +class DataStructuresPropertySpec extends ScalaCheckSuite { + property("ScheduledMessage asJava/asScala roundtrip preserves data") { + forAll { (key: String, payload: String, instant: Instant, canUpdate: Boolean) => + val original = ScheduledMessage(key, payload, instant, canUpdate) + val roundtripped = original.asJava.scheduledAsScala + + assertEquals(roundtripped.key, original.key) + assertEquals(roundtripped.payload, original.payload) + assertEquals(roundtripped.scheduleAt, original.scheduleAt) + assertEquals(roundtripped.canUpdate, original.canUpdate) + } + } + + property("DelayedQueueTimeConfig asJava/asScala roundtrip preserves data") { + forAll { (acquireTimeout: FiniteDuration, pollPeriod: FiniteDuration) => + val original = DelayedQueueTimeConfig(acquireTimeout, pollPeriod) + val roundtripped = original.asJava.timeConfigAsScala + + assertEquals(roundtripped.acquireTimeout.toMillis, original.acquireTimeout.toMillis) + assertEquals(roundtripped.pollPeriod.toMillis, original.pollPeriod.toMillis) + } + } + + property("MessageId is symmetric") { + forAll { (value: String) => + val messageId = MessageId(value) + assertEquals(messageId.value, value) + assertEquals(messageId.asJava.messageIdAsScala.value, value) + } + } + + property("CronConfigHash fromString is deterministic") { + forAll { (input: String) => + val hash1 = CronConfigHash.fromString(input) + val hash2 = CronConfigHash.fromString(input) + assertEquals(hash1.value, hash2.value) + } + } + + property("CronMessage key is unique for different times") { + forAll { (instant1: Instant, instant2: Instant) => + if instant1 != instant2 then { + val hash = CronConfigHash.fromString("test") + val prefix = "test-prefix" + val key1 = CronMessage.key(hash, prefix, instant1) + val key2 = CronMessage.key(hash, prefix, instant2) + assertNotEquals(key1, key2) + } + } + } + + property("BatchedMessage covariance works correctly") { + forAll { (input: Int, payload: String, instant: Instant) => + val message = ScheduledMessage(s"key-$input", payload, instant) + val batched: BatchedMessage[Int, String] = BatchedMessage(input, message) + // Covariance allows us to upcast + val widened: BatchedMessage[Any, Any] = batched + assertEquals(widened.input, input) + } + } + + property("BatchedReply covariance works correctly") { + forAll { (input: Int, payload: String, instant: Instant) => + val message = ScheduledMessage(s"key-$input", payload, instant) + val reply: BatchedReply[Int, String] = BatchedReply(input, message, OfferOutcome.Created) + // Covariance allows us to upcast + val widened: BatchedReply[Any, Any] = reply + assertEquals(widened.input, input) + } + } + + property("CronDailySchedule getNextTimes always returns at least one time") { + forAll { (hours: List[LocalTime], now: Instant, zoneId: ZoneId) => + if hours.nonEmpty then { + val schedule = CronDailySchedule( + zoneId = zoneId, + hoursOfDay = hours, + scheduleInAdvance = java.time.Duration.ofDays(1), + scheduleInterval = java.time.Duration.ofHours(1) + ) + val nextTimes = schedule.getNextTimes(now) + assert(nextTimes.nonEmpty, "getNextTimes should return at least one time") + assert(nextTimes.head.isAfter(now) || nextTimes.head == now, "First time should be >= now") + } + } + } + + property("OfferOutcome asJava/asScala roundtrip") { + forAll(Gen.oneOf(OfferOutcome.Created, OfferOutcome.Updated, OfferOutcome.Ignored)) { + outcome => + val roundtripped = outcome.asJava.offerOutcomeAsScala + assertEquals(roundtripped, outcome) + } + } + + property("DeliveryType asJava/asScala roundtrip") { + import org.funfix.delayedqueue.scala.DeliveryType.asScala + forAll(Gen.oneOf(DeliveryType.FirstDelivery, DeliveryType.Redelivery)) { deliveryType => + val roundtripped = deliveryType.asJava.asScala + assertEquals(roundtripped, deliveryType) + } + } + + property("RetryConfig validates backoffFactor >= 1.0") { + forAll { (backoffFactor: Double) => + // Simplified test: just check that invalid backoff factors are rejected + if backoffFactor < 1.0 && !backoffFactor.isNaN && !backoffFactor.isInfinite then { + val _ = intercept[IllegalArgumentException] { + RetryConfig( + initialDelay = java.time.Duration.ofMillis(100), + maxDelay = java.time.Duration.ofMillis(1000), + backoffFactor = backoffFactor, + maxRetries = None, + totalSoftTimeout = None, + perTryHardTimeout = None + ) + } + () + } else if backoffFactor >= 1.0 && !backoffFactor.isNaN && !backoffFactor.isInfinite then { + val config = RetryConfig( + initialDelay = java.time.Duration.ofMillis(100), + maxDelay = java.time.Duration.ofMillis(1000), + backoffFactor = backoffFactor, + maxRetries = None, + totalSoftTimeout = None, + perTryHardTimeout = None + ) + assertEquals(config.backoffFactor >= 1.0, true) + } + } + } + + property("JdbcDriver fromClassName is case-insensitive") { + forAll(Gen.oneOf(JdbcDriver.entries)) { driver => + val lower = JdbcDriver.fromClassName(driver.className.toLowerCase) + val upper = JdbcDriver.fromClassName(driver.className.toUpperCase) + assertEquals(lower, Some(driver)) + assertEquals(upper, Some(driver)) + } + } +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala new file mode 100644 index 0000000..4da365c --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.scalacheck.{Arbitrary, Gen} +import scala.concurrent.duration.* +import java.time.{Instant, LocalTime, ZoneId} + +/** Common ScalaCheck generators for DelayedQueue data structures. */ +object Generators { + + implicit val arbFiniteDuration: Arbitrary[FiniteDuration] = Arbitrary( + Gen.choose(0L, 1000000L).map(millis => FiniteDuration(millis, MILLISECONDS)) + ) + + implicit val arbInstant: Arbitrary[Instant] = Arbitrary( + Gen.choose(0L, System.currentTimeMillis() * 2).map(Instant.ofEpochMilli) + ) + + implicit val arbLocalTime: Arbitrary[LocalTime] = Arbitrary( + for { + hour <- Gen.choose(0, 23) + minute <- Gen.choose(0, 59) + } yield LocalTime.of(hour, minute) + ) + + implicit val arbZoneId: Arbitrary[ZoneId] = Arbitrary( + Gen.oneOf(ZoneId.of("UTC"), ZoneId.of("America/New_York"), ZoneId.of("Europe/London")) + ) +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala new file mode 100644 index 0000000..707cb05 --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.scala.JdbcDriver.asScala + +class JdbcDriverSpec extends munit.FunSuite { + + test("JdbcDriver constants should have correct class names") { + assertEquals(JdbcDriver.HSQLDB.className, "org.hsqldb.jdbc.JDBCDriver") + assertEquals(JdbcDriver.H2.className, "org.h2.Driver") + assertEquals(JdbcDriver.PostgreSQL.className, "org.postgresql.Driver") + assertEquals(JdbcDriver.MySQL.className, "com.mysql.cj.jdbc.Driver") + assertEquals(JdbcDriver.MariaDB.className, "org.mariadb.jdbc.Driver") + assertEquals(JdbcDriver.Sqlite.className, "org.sqlite.JDBC") + assertEquals(JdbcDriver.MsSqlServer.className, "com.microsoft.sqlserver.jdbc.SQLServerDriver") + assertEquals(JdbcDriver.Oracle.className, "oracle.jdbc.OracleDriver") + } + + test("JdbcDriver entries should contain all drivers") { + val allDrivers = Set( + JdbcDriver.HSQLDB, + JdbcDriver.H2, + JdbcDriver.PostgreSQL, + JdbcDriver.MySQL, + JdbcDriver.MariaDB, + JdbcDriver.Sqlite, + JdbcDriver.MsSqlServer, + JdbcDriver.Oracle + ) + + assertEquals(JdbcDriver.entries.toSet, allDrivers) + } + + test("asJava and asScala should be symmetric") { + JdbcDriver.entries.foreach { driver => + val roundtripped = driver.asJava.asScala + assertEquals(roundtripped.className, driver.className) + } + } +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala new file mode 100644 index 0000000..055299e --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala @@ -0,0 +1,78 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Duration +import org.funfix.delayedqueue.jvm +import org.funfix.delayedqueue.scala.RetryConfig.asScala + +class RetryConfigSpec extends munit.FunSuite { + + test("DEFAULT should have correct values") { + assertEquals(RetryConfig.DEFAULT.maxRetries, Some(5L)) + assertEquals(RetryConfig.DEFAULT.totalSoftTimeout, Some(Duration.ofSeconds(30))) + assertEquals(RetryConfig.DEFAULT.perTryHardTimeout, Some(Duration.ofSeconds(10))) + assertEquals(RetryConfig.DEFAULT.initialDelay, Duration.ofMillis(100)) + assertEquals(RetryConfig.DEFAULT.maxDelay, Duration.ofSeconds(5)) + assertEquals(RetryConfig.DEFAULT.backoffFactor, 2.0) + } + + test("NO_RETRIES should have correct values") { + assertEquals(RetryConfig.NO_RETRIES.maxRetries, Some(0L)) + assertEquals(RetryConfig.NO_RETRIES.totalSoftTimeout, None) + assertEquals(RetryConfig.NO_RETRIES.perTryHardTimeout, None) + assertEquals(RetryConfig.NO_RETRIES.backoffFactor, 1.0) + } + + test("asJava and asScala should be symmetric") { + val original = RetryConfig.DEFAULT + val roundtripped = original.asJava.asScala + + assertEquals(roundtripped.maxRetries, original.maxRetries) + assertEquals(roundtripped.totalSoftTimeout, original.totalSoftTimeout) + assertEquals(roundtripped.perTryHardTimeout, original.perTryHardTimeout) + assertEquals(roundtripped.initialDelay, original.initialDelay) + assertEquals(roundtripped.maxDelay, original.maxDelay) + assertEquals(roundtripped.backoffFactor, original.backoffFactor) + } + + test("should validate backoffFactor >= 1.0") { + intercept[IllegalArgumentException] { + RetryConfig( + initialDelay = Duration.ofMillis(100), + maxDelay = Duration.ofSeconds(5), + backoffFactor = 0.5, + maxRetries = None, + totalSoftTimeout = None, + perTryHardTimeout = None + ) + } + } + + test("should validate non-negative delays") { + intercept[IllegalArgumentException] { + RetryConfig( + initialDelay = Duration.ofMillis(-100), + maxDelay = Duration.ofSeconds(5), + backoffFactor = 2.0, + maxRetries = None, + totalSoftTimeout = None, + perTryHardTimeout = None + ) + } + } +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala new file mode 100644 index 0000000..ffa6515 --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Instant +import org.funfix.delayedqueue.scala.ScheduledMessage.asScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala as offerAsScala + +class ScheduledMessageSpec extends munit.FunSuite { + + test("ScheduledMessage asJava and asScala should be symmetric") { + val original = ScheduledMessage( + key = "test-key", + payload = "test-payload", + scheduleAt = Instant.ofEpochMilli(1000), + canUpdate = true + ) + + val roundtripped = original.asJava.asScala + + assertEquals(roundtripped.key, original.key) + assertEquals(roundtripped.payload, original.payload) + assertEquals(roundtripped.scheduleAt, original.scheduleAt) + assertEquals(roundtripped.canUpdate, original.canUpdate) + } + + test("BatchedMessage reply should create BatchedReply") { + val message = ScheduledMessage( + key = "test-key", + payload = "test-payload", + scheduleAt = Instant.ofEpochMilli(1000) + ) + + val batched = BatchedMessage(input = 42, message = message) + val reply = batched.reply(OfferOutcome.Created) + + assertEquals(reply.input, 42) + assertEquals(reply.message, message) + assertEquals(reply.outcome, OfferOutcome.Created) + } + + test("OfferOutcome isIgnored should work correctly") { + assert(OfferOutcome.Ignored.isIgnored) + assert(!OfferOutcome.Created.isIgnored) + assert(!OfferOutcome.Updated.isIgnored) + } + + test("OfferOutcome asJava and asScala should be symmetric") { + val outcomes = List(OfferOutcome.Created, OfferOutcome.Updated, OfferOutcome.Ignored) + + outcomes.foreach { outcome => + val roundtripped = outcome.asJava.offerAsScala + assertEquals(roundtripped, outcome) + } + } +}