From 0c5879e8330488e7a3bda1a80edc9ffe9c7d0e4e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 10:01:35 +0000 Subject: [PATCH 1/7] Initial plan From ac283e267af9871f116b8d90c7575359f577d3a4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 10:17:33 +0000 Subject: [PATCH 2/7] Add Scala data structures for DelayedQueue - Created all configuration types (RetryConfig, JdbcDatabasePoolConfig, DelayedQueueTimeConfig, JdbcConnectionConfig, DelayedQueueJDBCConfig) - Created message and envelope types (ScheduledMessage, BatchedMessage, BatchedReply, AckEnvelope, MessageId, DeliveryType) - Created cron/scheduling types (CronMessage, CronConfigHash, CronDailySchedule, CronMessageGenerator) - Created return/outcome types (OfferOutcome, ResourceUnavailableException, JdbcDriver) - Added asJava and asScala conversion extensions for all types - All code compiles and is formatted Co-authored-by: alexandru <11753+alexandru@users.noreply.github.com> --- .../delayedqueue/scala/AckEnvelope.scala | 133 ++++++++++++++++ .../delayedqueue/scala/CronConfigHash.scala | 77 ++++++++++ .../scala/CronDailySchedule.scala | 131 ++++++++++++++++ .../delayedqueue/scala/CronMessage.scala | 124 +++++++++++++++ .../scala/DelayedQueueJDBCConfig.scala | 126 ++++++++++++++++ .../scala/DelayedQueueTimeConfig.scala | 75 +++++++++ .../scala/JdbcConnectionConfig.scala | 70 +++++++++ .../scala/JdbcDatabasePoolConfig.scala | 84 +++++++++++ .../delayedqueue/scala/JdbcDriver.scala | 71 +++++++++ .../delayedqueue/scala/OfferOutcome.scala | 61 ++++++++ .../delayedqueue/scala/RetryConfig.scala | 131 ++++++++++++++++ .../delayedqueue/scala/ScheduledMessage.scala | 142 ++++++++++++++++++ .../delayedqueue/scala/exceptions.scala | 28 ++++ 13 files changed, 1253 insertions(+) create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala create mode 100644 delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala new file mode 100644 index 0000000..38b8f82 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala @@ -0,0 +1,133 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import cats.effect.IO +import java.time.Instant +import org.funfix.delayedqueue.jvm + +/** Message envelope that includes an acknowledgment callback. + * + * This wrapper is returned when polling messages from the queue. It contains the message payload + * plus metadata and an acknowledgment function that should be called after processing completes. + * + * This type is not serializable. + * + * ==Example== + * + * {{{ + * for { + * envelope <- queue.poll + * _ <- processMessage(envelope.payload) + * _ <- envelope.acknowledge + * } yield () + * }}} + * + * @tparam A + * the type of the message payload + * @param payload + * the actual message content + * @param messageId + * unique identifier for tracking this message + * @param timestamp + * when this envelope was created (poll time) + * @param source + * identifier for the queue or source system + * @param deliveryType + * indicates whether this is the first delivery or a redelivery + * @param acknowledge + * IO action to call to acknowledge successful processing, and delete the message from the queue + */ +final case class AckEnvelope[+A]( + payload: A, + messageId: MessageId, + timestamp: Instant, + source: String, + deliveryType: DeliveryType, + acknowledge: IO[Unit] +) + +object AckEnvelope { + + /** Conversion extension for JVM AckEnvelope. */ + extension [A](javaEnv: jvm.AckEnvelope[A]) { + + /** Converts a JVM AckEnvelope to a Scala AckEnvelope. */ + def asScala: AckEnvelope[A] = + AckEnvelope( + payload = javaEnv.payload, + messageId = MessageId.asScala(javaEnv.messageId), + timestamp = javaEnv.timestamp, + source = javaEnv.source, + deliveryType = DeliveryType.asScala(javaEnv.deliveryType), + acknowledge = IO.blocking(javaEnv.acknowledge()) + ) + } +} + +/** Unique identifier for a message. + * + * @param value + * the string representation of the message ID + */ +final case class MessageId(value: String) { + override def toString: String = value + + /** Converts this Scala MessageId to a JVM MessageId. */ + def asJava: jvm.MessageId = + new jvm.MessageId(value) +} + +object MessageId { + + /** Conversion extension for JVM MessageId. */ + extension (javaId: jvm.MessageId) { + + /** Converts a JVM MessageId to a Scala MessageId. */ + def asScala: MessageId = + MessageId(javaId.value) + } +} + +/** Indicates whether a message is being delivered for the first time or redelivered. */ +enum DeliveryType { + + /** Message is being delivered for the first time. */ + case FIRST_DELIVERY + + /** Message is being redelivered (was scheduled again after initial delivery). */ + case REDELIVERY + + /** Converts this Scala DeliveryType to a JVM DeliveryType. */ + def asJava: jvm.DeliveryType = this match { + case FIRST_DELIVERY => jvm.DeliveryType.FIRST_DELIVERY + case REDELIVERY => jvm.DeliveryType.REDELIVERY + } +} + +object DeliveryType { + + /** Conversion extension for JVM DeliveryType. */ + extension (javaType: jvm.DeliveryType) { + + /** Converts a JVM DeliveryType to a Scala DeliveryType. */ + def asScala: DeliveryType = javaType match { + case jvm.DeliveryType.FIRST_DELIVERY => DeliveryType.FIRST_DELIVERY + case jvm.DeliveryType.REDELIVERY => DeliveryType.REDELIVERY + } + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala new file mode 100644 index 0000000..241676f --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala @@ -0,0 +1,77 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.security.MessageDigest +import java.time.Duration +import org.funfix.delayedqueue.jvm + +/** Hash of a cron configuration, used to detect configuration changes. + * + * When a cron schedule is installed, this hash is used to identify messages belonging to that + * configuration. If the configuration changes, the hash will differ, allowing the system to clean + * up old scheduled messages. + * + * @param value + * the MD5 hash string + */ +final case class CronConfigHash(value: String) { + override def toString: String = value + + /** Converts this Scala CronConfigHash to a JVM CronConfigHash. */ + def asJava: jvm.CronConfigHash = + new jvm.CronConfigHash(value) +} + +object CronConfigHash { + + /** Creates a ConfigHash from a daily cron schedule configuration. */ + def fromDailyCron(config: CronDailySchedule): CronConfigHash = { + val text = s""" + |daily-cron: + | zone: ${config.zoneId} + | hours: ${config.hoursOfDay.mkString(", ")} + |""".stripMargin + CronConfigHash(md5(text)) + } + + /** Creates a ConfigHash from a periodic tick configuration. */ + def fromPeriodicTick(period: Duration): CronConfigHash = { + val text = s""" + |periodic-tick: + | period-ms: ${period.toMillis} + |""".stripMargin + CronConfigHash(md5(text)) + } + + /** Creates a ConfigHash from an arbitrary string. */ + def fromString(text: String): CronConfigHash = CronConfigHash(md5(text)) + + private def md5(input: String): String = { + val md = MessageDigest.getInstance("MD5") + val digest = md.digest(input.getBytes) + digest.map("%02x".format(_)).mkString + } + + /** Conversion extension for JVM CronConfigHash. */ + extension (javaHash: jvm.CronConfigHash) { + + /** Converts a JVM CronConfigHash to a Scala CronConfigHash. */ + def asScala: CronConfigHash = + CronConfigHash(javaHash.value) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala new file mode 100644 index 0000000..5ead708 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala @@ -0,0 +1,131 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Duration +import java.time.Instant +import java.time.LocalTime +import java.time.ZoneId +import org.funfix.delayedqueue.jvm +import scala.jdk.CollectionConverters.* + +/** Configuration for daily recurring scheduled messages with timezone support. + * + * This class defines when messages should be scheduled each day, with support for multiple times + * per day and scheduling messages in advance. + * + * @param zoneId + * the timezone for interpreting the hours of day + * @param hoursOfDay + * the times during each day when messages should be scheduled (must not be empty) + * @param scheduleInAdvance + * how far in advance to schedule messages + * @param scheduleInterval + * how often to check and update the schedule + */ +final case class CronDailySchedule( + zoneId: ZoneId, + hoursOfDay: List[LocalTime], + scheduleInAdvance: Duration, + scheduleInterval: Duration +) { + require(hoursOfDay.nonEmpty, "hoursOfDay must not be empty") + require( + !scheduleInterval.isZero && !scheduleInterval.isNegative, + "scheduleInterval must be positive" + ) + + /** Calculates the next scheduled times starting from now. + * + * Returns all times that should be scheduled, from now until (now + scheduleInAdvance). Always + * returns at least one time (the next scheduled time), even if it's beyond scheduleInAdvance. + * + * @param now + * the current time + * @return + * list of future instants when messages should be scheduled (never empty) + */ + def getNextTimes(now: Instant): List[Instant] = { + val until = now.plus(scheduleInAdvance) + val sortedHours = hoursOfDay.sortBy(identity) + + def getNextTime(currentTime: Instant): Instant = { + val zonedDateTime = currentTime.atZone(zoneId) + val localNow = zonedDateTime.toLocalTime + + // Find the next hour today + val nextHourToday = sortedHours.find(_.isAfter(localNow)) + + nextHourToday match { + case Some(nextHour) => + // Schedule for today + nextHour.atDate(zonedDateTime.toLocalDate).atZone(zoneId).toInstant + case None => + // Schedule for tomorrow (first hour of the day) + sortedHours.head + .atDate(zonedDateTime.toLocalDate.plusDays(1)) + .atZone(zoneId) + .toInstant + } + } + + def loop(currentTime: Instant, acc: List[Instant]): List[Instant] = { + val nextTime = getNextTime(currentTime) + if acc.nonEmpty && nextTime.isAfter(until) then { + acc.reverse + } else { + loop(nextTime, nextTime :: acc) + } + } + + loop(now, Nil) + } + + /** Converts this Scala CronDailySchedule to a JVM CronDailySchedule. */ + def asJava: jvm.CronDailySchedule = + new jvm.CronDailySchedule( + zoneId, + hoursOfDay.asJava, + scheduleInAdvance, + scheduleInterval + ) +} + +object CronDailySchedule { + + /** Creates a DailyCronSchedule with the specified configuration. */ + def create( + zoneId: ZoneId, + hoursOfDay: List[LocalTime], + scheduleInAdvance: Duration, + scheduleInterval: Duration + ): CronDailySchedule = + CronDailySchedule(zoneId, hoursOfDay, scheduleInAdvance, scheduleInterval) + + /** Conversion extension for JVM CronDailySchedule. */ + extension (javaSchedule: jvm.CronDailySchedule) { + + /** Converts a JVM CronDailySchedule to a Scala CronDailySchedule. */ + def asScala: CronDailySchedule = + CronDailySchedule( + zoneId = javaSchedule.zoneId, + hoursOfDay = javaSchedule.hoursOfDay.asScala.toList, + scheduleInAdvance = javaSchedule.scheduleInAdvance, + scheduleInterval = javaSchedule.scheduleInterval + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala new file mode 100644 index 0000000..4557ec1 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala @@ -0,0 +1,124 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Instant +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter +import java.util.Locale +import org.funfix.delayedqueue.jvm + +/** Represents a message for periodic (cron-like) scheduling. + * + * This wrapper is used for messages that should be scheduled repeatedly. The `scheduleAt` is used + * to generate the unique key, while `scheduleAtActual` allows for a different execution time + * (e.g., to add a delay). + * + * @tparam A + * the type of the message payload + * @param payload + * the message content + * @param scheduleAt + * the nominal schedule time (used for key generation) + * @param scheduleAtActual + * the actual execution time (defaults to scheduleAt if None) + */ +final case class CronMessage[+A]( + payload: A, + scheduleAt: Instant, + scheduleAtActual: Option[Instant] = None +) { + + /** Converts this CronMessage to a ScheduledMessage. + * + * @param configHash + * the configuration hash for this cron job + * @param keyPrefix + * the prefix for generating unique keys + * @param canUpdate + * whether the resulting message can update existing entries + */ + def toScheduled( + configHash: CronConfigHash, + keyPrefix: String, + canUpdate: Boolean + ): ScheduledMessage[A] = + ScheduledMessage( + key = CronMessage.key(configHash, keyPrefix, scheduleAt), + payload = payload, + scheduleAt = scheduleAtActual.getOrElse(scheduleAt), + canUpdate = canUpdate + ) + + /** Converts this Scala CronMessage to a JVM CronMessage. */ + def asJava[A1 >: A]: jvm.CronMessage[A1] = + new jvm.CronMessage[A1](payload, scheduleAt, scheduleAtActual.getOrElse(null)) +} + +object CronMessage { + private val CRON_DATE_TIME_FORMATTER: DateTimeFormatter = + DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(ZoneOffset.UTC) + private val NANOS_WIDTH = 9 + + private def formatTimestamp(scheduleAt: Instant): String = { + val nanos = String.format(Locale.ROOT, s"%0${NANOS_WIDTH}d", scheduleAt.getNano: Integer) + s"${CRON_DATE_TIME_FORMATTER.format(scheduleAt)}.$nanos" + } + + /** Generates a unique key for a cron message. + * + * @param configHash + * the configuration hash + * @param keyPrefix + * the key prefix + * @param scheduleAt + * the schedule time + * @return + * a unique key string + */ + def key(configHash: CronConfigHash, keyPrefix: String, scheduleAt: Instant): String = + s"$keyPrefix/${configHash.value}/${formatTimestamp(scheduleAt)}" + + /** Creates a factory function that produces CronMessages with a static payload. + * + * @param payload + * the static payload to use for all generated messages + * @return + * a function that creates CronMessages for any given instant + */ + def staticPayload[A](payload: A): CronMessageGenerator[A] = + (scheduleAt: Instant) => CronMessage(payload, scheduleAt) + + /** Conversion extension for JVM CronMessage. */ + extension [A](javaMsg: jvm.CronMessage[A]) { + + /** Converts a JVM CronMessage to a Scala CronMessage. */ + def asScala: CronMessage[A] = + CronMessage( + payload = javaMsg.payload, + scheduleAt = javaMsg.scheduleAt, + scheduleAtActual = Option(javaMsg.scheduleAtActual) + ) + } +} + +/** Function that generates CronMessages for given instants. */ +trait CronMessageGenerator[A] { + + /** Creates a cron message for the given instant. */ + def apply(at: Instant): CronMessage[A] +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala new file mode 100644 index 0000000..89f2ed1 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala @@ -0,0 +1,126 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.jvm + +/** Configuration for JDBC-based delayed queue instances. + * + * This configuration groups together all settings needed to create a DelayedQueueJDBC instance. + * + * ==Example== + * + * {{{ + * val dbConfig = JdbcConnectionConfig( + * url = "jdbc:hsqldb:mem:testdb", + * driver = JdbcDriver.HSQLDB, + * username = "SA", + * password = "", + * pool = null + * ) + * + * val config = DelayedQueueJDBCConfig( + * db = dbConfig, + * tableName = "delayed_queue_table", + * time = DelayedQueueTimeConfig.DEFAULT_JDBC, + * queueName = "my-queue", + * ackEnvSource = "DelayedQueueJDBC:my-queue", + * retryPolicy = Some(RetryConfig.DEFAULT) + * ) + * }}} + * + * @param db + * JDBC connection configuration + * @param tableName + * Name of the database table to use for storing queue messages + * @param time + * Time configuration for queue operations (poll periods, timeouts, etc.) + * @param queueName + * Unique name for this queue instance, used for partitioning messages in shared tables. Multiple + * queue instances can share the same database table if they have different queue names. + * @param ackEnvSource + * Source identifier for acknowledgement envelopes, used for tracing and debugging. Typically, + * follows the pattern "DelayedQueueJDBC:{queueName}". + * @param retryPolicy + * Optional retry configuration for database operations. If None, uses RetryConfig.DEFAULT. + */ +final case class DelayedQueueJDBCConfig( + db: JdbcConnectionConfig, + tableName: String, + time: DelayedQueueTimeConfig, + queueName: String, + ackEnvSource: String = "", + retryPolicy: Option[RetryConfig] = None +) { + require(tableName.nonEmpty, "tableName must not be blank") + require(queueName.nonEmpty, "queueName must not be blank") + require(ackEnvSource.nonEmpty, "ackEnvSource must not be blank") + + /** Converts this Scala DelayedQueueJDBCConfig to a JVM DelayedQueueJDBCConfig. */ + def asJava: jvm.DelayedQueueJDBCConfig = + new jvm.DelayedQueueJDBCConfig( + db.asJava, + tableName, + time.asJava, + queueName, + ackEnvSource, + retryPolicy.map(_.asJava).getOrElse(null) + ) +} + +object DelayedQueueJDBCConfig { + + /** Creates a default configuration for the given database, table name, and queue name. + * + * @param db + * JDBC connection configuration + * @param tableName + * Name of the database table to use + * @param queueName + * Unique name for this queue instance + * @return + * A configuration with default time and retry policies + */ + def create( + db: JdbcConnectionConfig, + tableName: String, + queueName: String + ): DelayedQueueJDBCConfig = + DelayedQueueJDBCConfig( + db = db, + tableName = tableName, + time = DelayedQueueTimeConfig.DEFAULT_JDBC, + queueName = queueName, + ackEnvSource = s"DelayedQueueJDBC:$queueName", + retryPolicy = Some(RetryConfig.DEFAULT) + ) + + /** Conversion extension for JVM DelayedQueueJDBCConfig. */ + extension (javaConfig: jvm.DelayedQueueJDBCConfig) { + + /** Converts a JVM DelayedQueueJDBCConfig to a Scala DelayedQueueJDBCConfig. */ + def asScala: DelayedQueueJDBCConfig = + DelayedQueueJDBCConfig( + db = JdbcConnectionConfig.asScala(javaConfig.db), + tableName = javaConfig.tableName, + time = DelayedQueueTimeConfig.asScala(javaConfig.time), + queueName = javaConfig.queueName, + ackEnvSource = javaConfig.ackEnvSource, + retryPolicy = Option(javaConfig.retryPolicy).map(RetryConfig.asScala) + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala new file mode 100644 index 0000000..7938b8c --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala @@ -0,0 +1,75 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Duration +import org.funfix.delayedqueue.jvm + +/** Time configuration for delayed queue operations. + * + * @param acquireTimeout + * maximum time to wait when acquiring/locking a message for processing + * @param pollPeriod + * interval between poll attempts when no messages are available + */ +final case class DelayedQueueTimeConfig( + acquireTimeout: Duration, + pollPeriod: Duration +) { + + /** Converts this Scala DelayedQueueTimeConfig to a JVM DelayedQueueTimeConfig. */ + def asJava: jvm.DelayedQueueTimeConfig = + new jvm.DelayedQueueTimeConfig(acquireTimeout, pollPeriod) +} + +object DelayedQueueTimeConfig { + + /** Default configuration for DelayedQueueInMemory. */ + val DEFAULT_IN_MEMORY: DelayedQueueTimeConfig = + DelayedQueueTimeConfig( + acquireTimeout = Duration.ofMinutes(5), + pollPeriod = Duration.ofMillis(500) + ) + + /** Default configuration for JDBC-based implementations, with longer acquire timeouts and poll + * periods to reduce database load in production environments. + */ + val DEFAULT_JDBC: DelayedQueueTimeConfig = + DelayedQueueTimeConfig( + acquireTimeout = Duration.ofMinutes(5), + pollPeriod = Duration.ofSeconds(3) + ) + + /** Default configuration for testing, with shorter timeouts and poll periods to speed up tests. + */ + val DEFAULT_TESTING: DelayedQueueTimeConfig = + DelayedQueueTimeConfig( + acquireTimeout = Duration.ofSeconds(30), + pollPeriod = Duration.ofMillis(100) + ) + + /** Conversion extension for JVM DelayedQueueTimeConfig. */ + extension (javaConfig: jvm.DelayedQueueTimeConfig) { + + /** Converts a JVM DelayedQueueTimeConfig to a Scala DelayedQueueTimeConfig. */ + def asScala: DelayedQueueTimeConfig = + DelayedQueueTimeConfig( + acquireTimeout = javaConfig.acquireTimeout, + pollPeriod = javaConfig.pollPeriod + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala new file mode 100644 index 0000000..8abd128 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.jvm + +/** Represents the configuration for a JDBC connection. + * + * @param url + * the JDBC connection URL + * @param driver + * the JDBC driver to use + * @param username + * optional username for authentication + * @param password + * optional password for authentication + * @param pool + * optional connection pool configuration + */ +final case class JdbcConnectionConfig( + url: String, + driver: JdbcDriver, + username: String | Null = null, + password: String | Null = null, + pool: JdbcDatabasePoolConfig | Null = null +) { + + /** Converts this Scala JdbcConnectionConfig to a JVM JdbcConnectionConfig. */ + def asJava: jvm.JdbcConnectionConfig = + new jvm.JdbcConnectionConfig( + url, + driver.asJava, + username, + password, + if pool == null then null else pool.asJava + ) +} + +object JdbcConnectionConfig { + + /** Conversion extension for JVM JdbcConnectionConfig. */ + extension (javaConfig: jvm.JdbcConnectionConfig) { + + /** Converts a JVM JdbcConnectionConfig to a Scala JdbcConnectionConfig. */ + def asScala: JdbcConnectionConfig = + JdbcConnectionConfig( + url = javaConfig.url, + driver = JdbcDriver.asScala(javaConfig.driver), + username = javaConfig.username, + password = javaConfig.password, + pool = + if javaConfig.pool == null then null + else JdbcDatabasePoolConfig.asScala(javaConfig.pool) + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala new file mode 100644 index 0000000..cd60f34 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala @@ -0,0 +1,84 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Duration +import org.funfix.delayedqueue.jvm + +/** Configuration for tuning the Hikari connection pool. + * + * @param connectionTimeout + * maximum time to wait for a connection from the pool + * @param idleTimeout + * maximum time a connection can sit idle in the pool + * @param maxLifetime + * maximum lifetime of a connection in the pool + * @param keepaliveTime + * frequency of keepalive checks + * @param maximumPoolSize + * maximum number of connections in the pool + * @param minimumIdle + * minimum number of idle connections to maintain + * @param leakDetectionThreshold + * time before a connection is considered leaked + * @param initializationFailTimeout + * time to wait for pool initialization + */ +final case class JdbcDatabasePoolConfig( + connectionTimeout: Duration = Duration.ofSeconds(30), + idleTimeout: Duration = Duration.ofMinutes(10), + maxLifetime: Duration = Duration.ofMinutes(30), + keepaliveTime: Duration = Duration.ZERO, + maximumPoolSize: Int = 10, + minimumIdle: Option[Int] = None, + leakDetectionThreshold: Option[Duration] = None, + initializationFailTimeout: Option[Duration] = None +) { + + /** Converts this Scala JdbcDatabasePoolConfig to a JVM JdbcDatabasePoolConfig. */ + def asJava: jvm.JdbcDatabasePoolConfig = + new jvm.JdbcDatabasePoolConfig( + connectionTimeout, + idleTimeout, + maxLifetime, + keepaliveTime, + maximumPoolSize, + minimumIdle.map(Int.box).getOrElse(null), + leakDetectionThreshold.getOrElse(null), + initializationFailTimeout.getOrElse(null) + ) +} + +object JdbcDatabasePoolConfig { + + /** Conversion extension for JVM JdbcDatabasePoolConfig. */ + extension (javaConfig: jvm.JdbcDatabasePoolConfig) { + + /** Converts a JVM JdbcDatabasePoolConfig to a Scala JdbcDatabasePoolConfig. */ + def asScala: JdbcDatabasePoolConfig = + JdbcDatabasePoolConfig( + connectionTimeout = javaConfig.connectionTimeout, + idleTimeout = javaConfig.idleTimeout, + maxLifetime = javaConfig.maxLifetime, + keepaliveTime = javaConfig.keepaliveTime, + maximumPoolSize = javaConfig.maximumPoolSize, + minimumIdle = Option(javaConfig.minimumIdle).map(_.intValue), + leakDetectionThreshold = Option(javaConfig.leakDetectionThreshold), + initializationFailTimeout = Option(javaConfig.initializationFailTimeout) + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala new file mode 100644 index 0000000..80b7fe5 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala @@ -0,0 +1,71 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.jvm + +/** JDBC driver configurations. + * + * @param className + * the JDBC driver class name + */ +final case class JdbcDriver(className: String) { + + /** Converts this Scala JdbcDriver to a JVM JdbcDriver. */ + def asJava: jvm.JdbcDriver = { + // Find the corresponding JVM driver by class name + import scala.jdk.CollectionConverters.* + val jvmEntries = jvm.JdbcDriver.getEntries.asScala + jvmEntries + .find(d => d.getClassName() == className) + .getOrElse { + throw new IllegalArgumentException(s"Unknown JDBC driver class name: $className") + } + } +} + +object JdbcDriver { + val HSQLDB: JdbcDriver = JdbcDriver("org.hsqldb.jdbc.JDBCDriver") + val H2: JdbcDriver = JdbcDriver("org.h2.Driver") + val MsSqlServer: JdbcDriver = JdbcDriver("com.microsoft.sqlserver.jdbc.SQLServerDriver") + val Sqlite: JdbcDriver = JdbcDriver("org.sqlite.JDBC") + val MariaDB: JdbcDriver = JdbcDriver("org.mariadb.jdbc.Driver") + val MySQL: JdbcDriver = JdbcDriver("com.mysql.cj.jdbc.Driver") + val PostgreSQL: JdbcDriver = JdbcDriver("org.postgresql.Driver") + val Oracle: JdbcDriver = JdbcDriver("oracle.jdbc.OracleDriver") + + val entries: List[JdbcDriver] = + List(H2, HSQLDB, MariaDB, MsSqlServer, MySQL, PostgreSQL, Sqlite, Oracle) + + /** Attempt to find a JdbcDriver by its class name. + * + * @param className + * the JDBC driver class name + * @return + * the JdbcDriver if found, None otherwise + */ + def fromClassName(className: String): Option[JdbcDriver] = + entries.find(_.className.equalsIgnoreCase(className)) + + /** Conversion extension for JVM JdbcDriver. */ + extension (javaDriver: jvm.JdbcDriver) { + + /** Converts a JVM JdbcDriver to a Scala JdbcDriver. */ + def asScala: JdbcDriver = + JdbcDriver(javaDriver.getClassName()) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala new file mode 100644 index 0000000..f756d51 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.jvm + +/** Outcome of offering a message to the delayed queue. + * + * This sealed trait represents the possible results when adding or updating a message in the + * queue. + */ +sealed trait OfferOutcome { + + /** Returns true if the offer was ignored (message already exists and cannot be updated). */ + def isIgnored: Boolean = this == OfferOutcome.Ignored + + /** Converts this Scala OfferOutcome to a JVM OfferOutcome. */ + def asJava: jvm.OfferOutcome = this match { + case OfferOutcome.Created => jvm.OfferOutcome.Created.INSTANCE + case OfferOutcome.Updated => jvm.OfferOutcome.Updated.INSTANCE + case OfferOutcome.Ignored => jvm.OfferOutcome.Ignored.INSTANCE + } +} + +object OfferOutcome { + + /** Message was successfully created (new entry). */ + case object Created extends OfferOutcome + + /** Message was successfully updated (existing entry modified). */ + case object Updated extends OfferOutcome + + /** Message offer was ignored (already exists and canUpdate was false). */ + case object Ignored extends OfferOutcome + + /** Conversion extension for JVM OfferOutcome. */ + extension (javaOutcome: jvm.OfferOutcome) { + + /** Converts a JVM OfferOutcome to a Scala OfferOutcome. */ + def asScala: OfferOutcome = javaOutcome match { + case jvm.OfferOutcome.Created.INSTANCE => OfferOutcome.Created + case jvm.OfferOutcome.Updated.INSTANCE => OfferOutcome.Updated + case jvm.OfferOutcome.Ignored.INSTANCE => OfferOutcome.Ignored + case _ => throw new IllegalArgumentException(s"Unknown OfferOutcome: $javaOutcome") + } + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala new file mode 100644 index 0000000..2ef7efa --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala @@ -0,0 +1,131 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Duration +import org.funfix.delayedqueue.jvm + +/** Configuration for retry loops with exponential backoff. + * + * Used to configure retry behavior for database operations that may experience transient failures + * such as deadlocks, connection issues, or transaction rollbacks. + * + * ==Example== + * + * {{{ + * val config = RetryConfig( + * maxRetries = Some(3), + * totalSoftTimeout = Some(Duration.ofSeconds(30)), + * perTryHardTimeout = Some(Duration.ofSeconds(10)), + * initialDelay = Duration.ofMillis(100), + * maxDelay = Duration.ofSeconds(5), + * backoffFactor = 2.0 + * ) + * }}} + * + * @param initialDelay + * Initial delay before first retry + * @param maxDelay + * Maximum delay between retries (backoff is capped at this value) + * @param backoffFactor + * Multiplier for exponential backoff (e.g., 2.0 for doubling delays) + * @param maxRetries + * Maximum number of retries (None means unlimited retries) + * @param totalSoftTimeout + * Total time after which retries stop (None means no timeout) + * @param perTryHardTimeout + * Hard timeout for each individual attempt (None means no per-try timeout) + */ +final case class RetryConfig( + initialDelay: Duration, + maxDelay: Duration, + backoffFactor: Double = 2.0, + maxRetries: Option[Long] = None, + totalSoftTimeout: Option[Duration] = None, + perTryHardTimeout: Option[Duration] = None +) { + require(backoffFactor >= 1.0, s"backoffFactor must be >= 1.0, got $backoffFactor") + require(!initialDelay.isNegative, s"initialDelay must not be negative, got $initialDelay") + require(!maxDelay.isNegative, s"maxDelay must not be negative, got $maxDelay") + require(maxRetries.forall(_ >= 0), s"maxRetries must be >= 0 or None, got $maxRetries") + require( + totalSoftTimeout.forall(!_.isNegative), + s"totalSoftTimeout must not be negative, got $totalSoftTimeout" + ) + require( + perTryHardTimeout.forall(!_.isNegative), + s"perTryHardTimeout must not be negative, got $perTryHardTimeout" + ) + + /** Converts this Scala RetryConfig to a JVM RetryConfig. */ + def asJava: jvm.RetryConfig = + new jvm.RetryConfig( + initialDelay, + maxDelay, + backoffFactor, + maxRetries.map(Long.box).getOrElse(null), + totalSoftTimeout.getOrElse(null), + perTryHardTimeout.getOrElse(null) + ) +} + +object RetryConfig { + + /** Default retry configuration with reasonable defaults for database operations: + * - 5 retries maximum + * - 30 second total timeout + * - 10 second per-try timeout + * - 100ms initial delay + * - 5 second max delay + * - 2.0 backoff factor (exponential doubling) + */ + val DEFAULT: RetryConfig = + RetryConfig( + maxRetries = Some(5), + totalSoftTimeout = Some(Duration.ofSeconds(30)), + perTryHardTimeout = Some(Duration.ofSeconds(10)), + initialDelay = Duration.ofMillis(100), + maxDelay = Duration.ofSeconds(5), + backoffFactor = 2.0 + ) + + /** No retries - operations fail immediately on first error. */ + val NO_RETRIES: RetryConfig = + RetryConfig( + maxRetries = Some(0), + totalSoftTimeout = None, + perTryHardTimeout = None, + initialDelay = Duration.ZERO, + maxDelay = Duration.ZERO, + backoffFactor = 1.0 + ) + + /** Conversion extension for JVM RetryConfig. */ + extension (javaConfig: jvm.RetryConfig) { + + /** Converts a JVM RetryConfig to a Scala RetryConfig. */ + def asScala: RetryConfig = + RetryConfig( + initialDelay = javaConfig.initialDelay, + maxDelay = javaConfig.maxDelay, + backoffFactor = javaConfig.backoffFactor, + maxRetries = Option(javaConfig.maxRetries).map(_.longValue), + totalSoftTimeout = Option(javaConfig.totalSoftTimeout), + perTryHardTimeout = Option(javaConfig.perTryHardTimeout) + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala new file mode 100644 index 0000000..366b48b --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala @@ -0,0 +1,142 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Instant +import org.funfix.delayedqueue.jvm + +/** Represents a message scheduled for future delivery in the delayed queue. + * + * This is the primary data structure for messages that will be processed at a specific time in the + * future. + * + * @tparam A + * the type of the message payload + * @param key + * unique identifier for this message; can be used to update or delete the message + * @param payload + * the actual message content + * @param scheduleAt + * the timestamp when this message becomes available for polling + * @param canUpdate + * whether existing messages with the same key can be updated + */ +final case class ScheduledMessage[+A]( + key: String, + payload: A, + scheduleAt: Instant, + canUpdate: Boolean = true +) { + + /** Converts this Scala ScheduledMessage to a JVM ScheduledMessage. */ + def asJava[A1 >: A]: jvm.ScheduledMessage[A1] = + new jvm.ScheduledMessage[A1](key, payload, scheduleAt, canUpdate) +} + +object ScheduledMessage { + + /** Conversion extension for JVM ScheduledMessage. */ + extension [A](javaMsg: jvm.ScheduledMessage[A]) { + + /** Converts a JVM ScheduledMessage to a Scala ScheduledMessage. */ + def asScala: ScheduledMessage[A] = + ScheduledMessage( + key = javaMsg.key, + payload = javaMsg.payload, + scheduleAt = javaMsg.scheduleAt, + canUpdate = javaMsg.canUpdate + ) + } +} + +/** Wrapper for batched message operations, associating input metadata with scheduled messages. + * + * @tparam In + * the type of the input metadata + * @tparam A + * the type of the message payload + * @param input + * the original input metadata + * @param message + * the scheduled message + */ +final case class BatchedMessage[In, A]( + input: In, + message: ScheduledMessage[A] +) { + + /** Creates a reply for this batched message with the given outcome. */ + def reply(outcome: OfferOutcome): BatchedReply[In, A] = + BatchedReply(input, message, outcome) + + /** Converts this Scala BatchedMessage to a JVM BatchedMessage. */ + def asJava: jvm.BatchedMessage[In, A] = + new jvm.BatchedMessage[In, A](input, message.asJava) +} + +object BatchedMessage { + + /** Conversion extension for JVM BatchedMessage. */ + extension [In, A](javaMsg: jvm.BatchedMessage[In, A]) { + + /** Converts a JVM BatchedMessage to a Scala BatchedMessage. */ + def asScala: BatchedMessage[In, A] = + BatchedMessage( + input = javaMsg.input, + message = ScheduledMessage.asScala(javaMsg.message) + ) + } +} + +/** Reply for a batched message operation, containing the outcome. + * + * @tparam In + * the type of the input metadata + * @tparam A + * the type of the message payload + * @param input + * the original input metadata + * @param message + * the scheduled message + * @param outcome + * the result of offering this message + */ +final case class BatchedReply[In, A]( + input: In, + message: ScheduledMessage[A], + outcome: OfferOutcome +) { + + /** Converts this Scala BatchedReply to a JVM BatchedReply. */ + def asJava: jvm.BatchedReply[In, A] = + new jvm.BatchedReply[In, A](input, message.asJava, outcome.asJava) +} + +object BatchedReply { + + /** Conversion extension for JVM BatchedReply. */ + extension [In, A](javaReply: jvm.BatchedReply[In, A]) { + + /** Converts a JVM BatchedReply to a Scala BatchedReply. */ + def asScala: BatchedReply[In, A] = + BatchedReply( + input = javaReply.input, + message = ScheduledMessage.asScala(javaReply.message), + outcome = OfferOutcome.asScala(javaReply.outcome) + ) + } +} diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala new file mode 100644 index 0000000..1dea0c0 --- /dev/null +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/exceptions.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +/** Checked exception thrown in case of exceptions happening that are not recoverable, rendering + * DelayedQueue inaccessible. + * + * Example: issues with the RDBMS (bugs, or connection unavailable, failing after multiple retries) + */ +class ResourceUnavailableException(message: String, cause: Throwable | Null) + extends Exception(message, cause) { + + def this(message: String) = this(message, null) +} From f91045916a849c6eb13ab254a1d83222f5154045 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 10:19:46 +0000 Subject: [PATCH 3/7] Add tests for Scala data structures - Created test suites for RetryConfig, ScheduledMessage, JdbcDriver, and Cron types - All tests verify conversions (asJava/asScala), validation, and basic functionality - All 23 tests pass Co-authored-by: alexandru <11753+alexandru@users.noreply.github.com> --- .../funfix/delayedqueue/scala/CronSpec.scala | 122 ++++++++++++++++++ .../delayedqueue/scala/JdbcDriverSpec.scala | 61 +++++++++ .../delayedqueue/scala/RetryConfigSpec.scala | 71 ++++++++++ .../scala/ScheduledMessageSpec.scala | 70 ++++++++++ 4 files changed, 324 insertions(+) create mode 100644 delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala create mode 100644 delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala create mode 100644 delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala create mode 100644 delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala new file mode 100644 index 0000000..ae7b7ee --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/CronSpec.scala @@ -0,0 +1,122 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Instant +import java.time.LocalTime +import java.time.ZoneId +import java.time.Duration +import org.funfix.delayedqueue.scala.CronConfigHash.asScala +import org.funfix.delayedqueue.scala.CronMessage.asScala as cronAsScala +import org.funfix.delayedqueue.scala.CronDailySchedule.asScala as scheduleAsScala + +class CronSpec extends munit.FunSuite { + + test("CronConfigHash fromString should be deterministic") { + val text = "test-config" + val hash1 = CronConfigHash.fromString(text) + val hash2 = CronConfigHash.fromString(text) + + assertEquals(hash1.value, hash2.value) + } + + test("CronConfigHash fromDailyCron should create hash") { + val schedule = CronDailySchedule( + zoneId = ZoneId.of("UTC"), + hoursOfDay = List(LocalTime.of(10, 0)), + scheduleInAdvance = Duration.ofDays(1), + scheduleInterval = Duration.ofHours(1) + ) + + val hash = CronConfigHash.fromDailyCron(schedule) + assert(hash.value.nonEmpty) + } + + test("CronConfigHash fromPeriodicTick should create hash") { + val hash = CronConfigHash.fromPeriodicTick(Duration.ofMinutes(5)) + assert(hash.value.nonEmpty) + } + + test("CronMessage key should be unique for different times") { + val hash = CronConfigHash.fromString("test") + val prefix = "test-prefix" + + val key1 = CronMessage.key(hash, prefix, Instant.ofEpochMilli(1000)) + val key2 = CronMessage.key(hash, prefix, Instant.ofEpochMilli(2000)) + + assertNotEquals(key1, key2) + } + + test("CronMessage toScheduled should create ScheduledMessage") { + val cronMsg = CronMessage( + payload = "test-payload", + scheduleAt = Instant.ofEpochMilli(1000) + ) + + val hash = CronConfigHash.fromString("test") + val scheduled = cronMsg.toScheduled(hash, "prefix", canUpdate = true) + + assertEquals(scheduled.payload, "test-payload") + assertEquals(scheduled.scheduleAt, Instant.ofEpochMilli(1000)) + assertEquals(scheduled.canUpdate, true) + } + + test("CronMessage staticPayload should create generator") { + val generator = CronMessage.staticPayload("static-payload") + val instant = Instant.ofEpochMilli(1000) + val cronMsg = generator(instant) + + assertEquals(cronMsg.payload, "static-payload") + assertEquals(cronMsg.scheduleAt, instant) + } + + test("CronDailySchedule getNextTimes should return at least one time") { + val schedule = CronDailySchedule( + zoneId = ZoneId.of("UTC"), + hoursOfDay = List(LocalTime.of(10, 0)), + scheduleInAdvance = Duration.ofDays(1), + scheduleInterval = Duration.ofHours(1) + ) + + val now = Instant.parse("2024-01-01T08:00:00Z") + val nextTimes = schedule.getNextTimes(now) + + assert(nextTimes.nonEmpty) + } + + test("CronDailySchedule should validate non-empty hours") { + intercept[IllegalArgumentException] { + CronDailySchedule( + zoneId = ZoneId.of("UTC"), + hoursOfDay = List.empty, + scheduleInAdvance = Duration.ofDays(1), + scheduleInterval = Duration.ofHours(1) + ) + } + } + + test("CronDailySchedule should validate positive schedule interval") { + intercept[IllegalArgumentException] { + CronDailySchedule( + zoneId = ZoneId.of("UTC"), + hoursOfDay = List(LocalTime.of(10, 0)), + scheduleInAdvance = Duration.ofDays(1), + scheduleInterval = Duration.ZERO + ) + } + } +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala new file mode 100644 index 0000000..15cc2c8 --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.funfix.delayedqueue.scala.JdbcDriver.asScala + +class JdbcDriverSpec extends munit.FunSuite { + + test("JdbcDriver constants should have correct class names") { + assertEquals(JdbcDriver.HSQLDB.className, "org.hsqldb.jdbc.JDBCDriver") + assertEquals(JdbcDriver.H2.className, "org.h2.Driver") + assertEquals(JdbcDriver.PostgreSQL.className, "org.postgresql.Driver") + assertEquals(JdbcDriver.MySQL.className, "com.mysql.cj.jdbc.Driver") + assertEquals(JdbcDriver.MariaDB.className, "org.mariadb.jdbc.Driver") + assertEquals(JdbcDriver.Sqlite.className, "org.sqlite.JDBC") + assertEquals(JdbcDriver.MsSqlServer.className, "com.microsoft.sqlserver.jdbc.SQLServerDriver") + assertEquals(JdbcDriver.Oracle.className, "oracle.jdbc.OracleDriver") + } + + test("JdbcDriver entries should contain all drivers") { + val allDrivers = Set( + JdbcDriver.HSQLDB, + JdbcDriver.H2, + JdbcDriver.PostgreSQL, + JdbcDriver.MySQL, + JdbcDriver.MariaDB, + JdbcDriver.Sqlite, + JdbcDriver.MsSqlServer, + JdbcDriver.Oracle + ) + + assertEquals(JdbcDriver.entries.toSet, allDrivers) + } + + test("fromClassName should find drivers case-insensitively") { + assertEquals(JdbcDriver.fromClassName("org.h2.Driver"), Some(JdbcDriver.H2)) + assertEquals(JdbcDriver.fromClassName("ORG.H2.DRIVER"), Some(JdbcDriver.H2)) + assertEquals(JdbcDriver.fromClassName("unknown"), None) + } + + test("asJava and asScala should be symmetric") { + JdbcDriver.entries.foreach { driver => + val roundtripped = driver.asJava.asScala + assertEquals(roundtripped.className, driver.className) + } + } +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala new file mode 100644 index 0000000..9485375 --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala @@ -0,0 +1,71 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Duration +import org.funfix.delayedqueue.jvm +import org.funfix.delayedqueue.scala.RetryConfig.asScala + +class RetryConfigSpec extends munit.FunSuite { + + test("DEFAULT should have correct values") { + assertEquals(RetryConfig.DEFAULT.maxRetries, Some(5L)) + assertEquals(RetryConfig.DEFAULT.totalSoftTimeout, Some(Duration.ofSeconds(30))) + assertEquals(RetryConfig.DEFAULT.perTryHardTimeout, Some(Duration.ofSeconds(10))) + assertEquals(RetryConfig.DEFAULT.initialDelay, Duration.ofMillis(100)) + assertEquals(RetryConfig.DEFAULT.maxDelay, Duration.ofSeconds(5)) + assertEquals(RetryConfig.DEFAULT.backoffFactor, 2.0) + } + + test("NO_RETRIES should have correct values") { + assertEquals(RetryConfig.NO_RETRIES.maxRetries, Some(0L)) + assertEquals(RetryConfig.NO_RETRIES.totalSoftTimeout, None) + assertEquals(RetryConfig.NO_RETRIES.perTryHardTimeout, None) + assertEquals(RetryConfig.NO_RETRIES.backoffFactor, 1.0) + } + + test("asJava and asScala should be symmetric") { + val original = RetryConfig.DEFAULT + val roundtripped = original.asJava.asScala + + assertEquals(roundtripped.maxRetries, original.maxRetries) + assertEquals(roundtripped.totalSoftTimeout, original.totalSoftTimeout) + assertEquals(roundtripped.perTryHardTimeout, original.perTryHardTimeout) + assertEquals(roundtripped.initialDelay, original.initialDelay) + assertEquals(roundtripped.maxDelay, original.maxDelay) + assertEquals(roundtripped.backoffFactor, original.backoffFactor) + } + + test("should validate backoffFactor >= 1.0") { + intercept[IllegalArgumentException] { + RetryConfig( + initialDelay = Duration.ofMillis(100), + maxDelay = Duration.ofSeconds(5), + backoffFactor = 0.5 + ) + } + } + + test("should validate non-negative delays") { + intercept[IllegalArgumentException] { + RetryConfig( + initialDelay = Duration.ofMillis(-100), + maxDelay = Duration.ofSeconds(5) + ) + } + } +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala new file mode 100644 index 0000000..ffa6515 --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/ScheduledMessageSpec.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import java.time.Instant +import org.funfix.delayedqueue.scala.ScheduledMessage.asScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala as offerAsScala + +class ScheduledMessageSpec extends munit.FunSuite { + + test("ScheduledMessage asJava and asScala should be symmetric") { + val original = ScheduledMessage( + key = "test-key", + payload = "test-payload", + scheduleAt = Instant.ofEpochMilli(1000), + canUpdate = true + ) + + val roundtripped = original.asJava.asScala + + assertEquals(roundtripped.key, original.key) + assertEquals(roundtripped.payload, original.payload) + assertEquals(roundtripped.scheduleAt, original.scheduleAt) + assertEquals(roundtripped.canUpdate, original.canUpdate) + } + + test("BatchedMessage reply should create BatchedReply") { + val message = ScheduledMessage( + key = "test-key", + payload = "test-payload", + scheduleAt = Instant.ofEpochMilli(1000) + ) + + val batched = BatchedMessage(input = 42, message = message) + val reply = batched.reply(OfferOutcome.Created) + + assertEquals(reply.input, 42) + assertEquals(reply.message, message) + assertEquals(reply.outcome, OfferOutcome.Created) + } + + test("OfferOutcome isIgnored should work correctly") { + assert(OfferOutcome.Ignored.isIgnored) + assert(!OfferOutcome.Created.isIgnored) + assert(!OfferOutcome.Updated.isIgnored) + } + + test("OfferOutcome asJava and asScala should be symmetric") { + val outcomes = List(OfferOutcome.Created, OfferOutcome.Updated, OfferOutcome.Ignored) + + outcomes.foreach { outcome => + val roundtripped = outcome.asJava.offerAsScala + assertEquals(roundtripped, outcome) + } + } +} From 3e6523b2f7a6ae35313d9b5f00bb90fc5f413816 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 10:52:40 +0000 Subject: [PATCH 4/7] Address PR feedback: use Scala types, opaque MessageId, fix implementations - Replace java.time.Duration with scala.concurrent.duration.FiniteDuration - Make MessageId an opaque type instead of case class - Fix OfferOutcome pattern matching to use instanceof checks - Add covariance (+In, +A) to BatchedMessage and BatchedReply - Port CronConfigHash, CronDailySchedule implementations exactly from JVM - Add ScalaCheck and munit-scalacheck dependencies for property-based testing - Remove unused imports Co-authored-by: alexandru <11753+alexandru@users.noreply.github.com> --- build.sbt | 2 + .../delayedqueue/scala/AckEnvelope.scala | 28 +++++---- .../delayedqueue/scala/CronConfigHash.scala | 27 ++++---- .../scala/CronDailySchedule.scala | 61 +++++++++++-------- .../scala/DelayedQueueTimeConfig.scala | 32 ++++++---- .../delayedqueue/scala/OfferOutcome.scala | 7 +-- .../delayedqueue/scala/ScheduledMessage.scala | 12 ++-- 7 files changed, 97 insertions(+), 72 deletions(-) diff --git a/build.sbt b/build.sbt index ea629b8..323d9fc 100644 --- a/build.sbt +++ b/build.sbt @@ -78,6 +78,8 @@ lazy val delayedqueue = crossProject(JVMPlatform) // Testing "org.scalameta" %% "munit" % "1.0.4" % Test, "org.typelevel" %% "munit-cats-effect" % "2.1.0" % Test, + "org.scalacheck" %% "scalacheck" % "1.19.0" % Test, + "org.scalameta" %% "munit-scalacheck" % "1.2.0" % Test, ) ) diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala index 38b8f82..65df4af 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala @@ -79,21 +79,25 @@ object AckEnvelope { } } -/** Unique identifier for a message. - * - * @param value - * the string representation of the message ID - */ -final case class MessageId(value: String) { - override def toString: String = value - - /** Converts this Scala MessageId to a JVM MessageId. */ - def asJava: jvm.MessageId = - new jvm.MessageId(value) -} +/** Unique identifier for a message. */ +opaque type MessageId = String object MessageId { + /** Creates a MessageId from a String value. */ + def apply(value: String): MessageId = value + + /** Conversion extension for String to MessageId. */ + extension (id: MessageId) { + + /** Gets the string value of the MessageId. */ + def value: String = id + + /** Converts this Scala MessageId to a JVM MessageId. */ + def asJava: jvm.MessageId = + new jvm.MessageId(id) + } + /** Conversion extension for JVM MessageId. */ extension (javaId: jvm.MessageId) { diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala index 241676f..383ac16 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala @@ -17,7 +17,6 @@ package org.funfix.delayedqueue.scala import java.security.MessageDigest -import java.time.Duration import org.funfix.delayedqueue.jvm /** Hash of a cron configuration, used to detect configuration changes. @@ -41,21 +40,23 @@ object CronConfigHash { /** Creates a ConfigHash from a daily cron schedule configuration. */ def fromDailyCron(config: CronDailySchedule): CronConfigHash = { - val text = s""" - |daily-cron: - | zone: ${config.zoneId} - | hours: ${config.hoursOfDay.mkString(", ")} - |""".stripMargin - CronConfigHash(md5(text)) + // Port from Kotlin: buildString { appendLine(); appendLine("daily-cron:"); ... } + val text = new StringBuilder() + text.append('\n') // Leading newline to match Kotlin + text.append("daily-cron:\n") + text.append(s" zone: ${config.zoneId}\n") + text.append(s" hours: ${config.hoursOfDay.mkString(", ")}\n") + CronConfigHash(md5(text.toString)) } /** Creates a ConfigHash from a periodic tick configuration. */ - def fromPeriodicTick(period: Duration): CronConfigHash = { - val text = s""" - |periodic-tick: - | period-ms: ${period.toMillis} - |""".stripMargin - CronConfigHash(md5(text)) + def fromPeriodicTick(period: java.time.Duration): CronConfigHash = { + // Port from Kotlin: buildString { appendLine(); appendLine("periodic-tick:"); ... } + val text = new StringBuilder() + text.append('\n') // Leading newline to match Kotlin + text.append("periodic-tick:\n") + text.append(s" period-ms: ${period.toMillis}\n") + CronConfigHash(md5(text.toString)) } /** Creates a ConfigHash from an arbitrary string. */ diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala index 5ead708..9262ca0 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala @@ -60,39 +60,50 @@ final case class CronDailySchedule( * list of future instants when messages should be scheduled (never empty) */ def getNextTimes(now: Instant): List[Instant] = { + // Port from Kotlin implementation to match behavior exactly val until = now.plus(scheduleInAdvance) val sortedHours = hoursOfDay.sortBy(identity) + val result = scala.collection.mutable.ArrayBuffer[Instant]() - def getNextTime(currentTime: Instant): Instant = { - val zonedDateTime = currentTime.atZone(zoneId) - val localNow = zonedDateTime.toLocalTime - - // Find the next hour today - val nextHourToday = sortedHours.find(_.isAfter(localNow)) - - nextHourToday match { - case Some(nextHour) => - // Schedule for today - nextHour.atDate(zonedDateTime.toLocalDate).atZone(zoneId).toInstant - case None => - // Schedule for tomorrow (first hour of the day) - sortedHours.head - .atDate(zonedDateTime.toLocalDate.plusDays(1)) - .atZone(zoneId) - .toInstant - } - } + var currentTime = now + var nextTime = getNextTime(currentTime, sortedHours) - def loop(currentTime: Instant, acc: List[Instant]): List[Instant] = { - val nextTime = getNextTime(currentTime) - if acc.nonEmpty && nextTime.isAfter(until) then { - acc.reverse + // Always add the first nextTime (matches NonEmptyList behavior from Scala) + result += nextTime + + // Then add more if they're within the window + var continue = true + while continue do { + currentTime = nextTime + nextTime = getNextTime(currentTime, sortedHours) + if nextTime.isAfter(until) then { + continue = false } else { - loop(nextTime, nextTime :: acc) + result += nextTime } } - loop(now, Nil) + result.toList + } + + private def getNextTime(now: Instant, sortedHours: List[LocalTime]): Instant = { + val zonedDateTime = now.atZone(zoneId) + val localNow = zonedDateTime.toLocalTime + + // Find the next hour today + val nextHourToday = sortedHours.find(_.isAfter(localNow)) + + nextHourToday match { + case Some(nextHour) => + // Schedule for today + nextHour.atDate(zonedDateTime.toLocalDate).atZone(zoneId).toInstant + case None => + // Schedule for tomorrow (first hour of the day) + sortedHours.head + .atDate(zonedDateTime.toLocalDate.plusDays(1)) + .atZone(zoneId) + .toInstant + } } /** Converts this Scala CronDailySchedule to a JVM CronDailySchedule. */ diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala index 7938b8c..03691ab 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala @@ -16,7 +16,7 @@ package org.funfix.delayedqueue.scala -import java.time.Duration +import scala.concurrent.duration.FiniteDuration import org.funfix.delayedqueue.jvm /** Time configuration for delayed queue operations. @@ -27,13 +27,16 @@ import org.funfix.delayedqueue.jvm * interval between poll attempts when no messages are available */ final case class DelayedQueueTimeConfig( - acquireTimeout: Duration, - pollPeriod: Duration + acquireTimeout: FiniteDuration, + pollPeriod: FiniteDuration ) { /** Converts this Scala DelayedQueueTimeConfig to a JVM DelayedQueueTimeConfig. */ def asJava: jvm.DelayedQueueTimeConfig = - new jvm.DelayedQueueTimeConfig(acquireTimeout, pollPeriod) + new jvm.DelayedQueueTimeConfig( + java.time.Duration.ofMillis(acquireTimeout.toMillis), + java.time.Duration.ofMillis(pollPeriod.toMillis) + ) } object DelayedQueueTimeConfig { @@ -41,8 +44,8 @@ object DelayedQueueTimeConfig { /** Default configuration for DelayedQueueInMemory. */ val DEFAULT_IN_MEMORY: DelayedQueueTimeConfig = DelayedQueueTimeConfig( - acquireTimeout = Duration.ofMinutes(5), - pollPeriod = Duration.ofMillis(500) + acquireTimeout = FiniteDuration(5, scala.concurrent.duration.MINUTES), + pollPeriod = FiniteDuration(500, scala.concurrent.duration.MILLISECONDS) ) /** Default configuration for JDBC-based implementations, with longer acquire timeouts and poll @@ -50,16 +53,16 @@ object DelayedQueueTimeConfig { */ val DEFAULT_JDBC: DelayedQueueTimeConfig = DelayedQueueTimeConfig( - acquireTimeout = Duration.ofMinutes(5), - pollPeriod = Duration.ofSeconds(3) + acquireTimeout = FiniteDuration(5, scala.concurrent.duration.MINUTES), + pollPeriod = FiniteDuration(3, scala.concurrent.duration.SECONDS) ) /** Default configuration for testing, with shorter timeouts and poll periods to speed up tests. */ val DEFAULT_TESTING: DelayedQueueTimeConfig = DelayedQueueTimeConfig( - acquireTimeout = Duration.ofSeconds(30), - pollPeriod = Duration.ofMillis(100) + acquireTimeout = FiniteDuration(30, scala.concurrent.duration.SECONDS), + pollPeriod = FiniteDuration(100, scala.concurrent.duration.MILLISECONDS) ) /** Conversion extension for JVM DelayedQueueTimeConfig. */ @@ -68,8 +71,13 @@ object DelayedQueueTimeConfig { /** Converts a JVM DelayedQueueTimeConfig to a Scala DelayedQueueTimeConfig. */ def asScala: DelayedQueueTimeConfig = DelayedQueueTimeConfig( - acquireTimeout = javaConfig.acquireTimeout, - pollPeriod = javaConfig.pollPeriod + acquireTimeout = + FiniteDuration( + javaConfig.acquireTimeout.toMillis, + scala.concurrent.duration.MILLISECONDS + ), + pollPeriod = + FiniteDuration(javaConfig.pollPeriod.toMillis, scala.concurrent.duration.MILLISECONDS) ) } } diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala index f756d51..4f34a92 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala @@ -52,10 +52,9 @@ object OfferOutcome { /** Converts a JVM OfferOutcome to a Scala OfferOutcome. */ def asScala: OfferOutcome = javaOutcome match { - case jvm.OfferOutcome.Created.INSTANCE => OfferOutcome.Created - case jvm.OfferOutcome.Updated.INSTANCE => OfferOutcome.Updated - case jvm.OfferOutcome.Ignored.INSTANCE => OfferOutcome.Ignored - case _ => throw new IllegalArgumentException(s"Unknown OfferOutcome: $javaOutcome") + case _: jvm.OfferOutcome.Created => OfferOutcome.Created + case _: jvm.OfferOutcome.Updated => OfferOutcome.Updated + case _: jvm.OfferOutcome.Ignored => OfferOutcome.Ignored } } } diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala index 366b48b..2679f04 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/ScheduledMessage.scala @@ -74,7 +74,7 @@ object ScheduledMessage { * @param message * the scheduled message */ -final case class BatchedMessage[In, A]( +final case class BatchedMessage[+In, +A]( input: In, message: ScheduledMessage[A] ) { @@ -84,8 +84,8 @@ final case class BatchedMessage[In, A]( BatchedReply(input, message, outcome) /** Converts this Scala BatchedMessage to a JVM BatchedMessage. */ - def asJava: jvm.BatchedMessage[In, A] = - new jvm.BatchedMessage[In, A](input, message.asJava) + def asJava[In1 >: In, A1 >: A]: jvm.BatchedMessage[In1, A1] = + new jvm.BatchedMessage[In1, A1](input, message.asJava) } object BatchedMessage { @@ -115,15 +115,15 @@ object BatchedMessage { * @param outcome * the result of offering this message */ -final case class BatchedReply[In, A]( +final case class BatchedReply[+In, +A]( input: In, message: ScheduledMessage[A], outcome: OfferOutcome ) { /** Converts this Scala BatchedReply to a JVM BatchedReply. */ - def asJava: jvm.BatchedReply[In, A] = - new jvm.BatchedReply[In, A](input, message.asJava, outcome.asJava) + def asJava[In1 >: In, A1 >: A]: jvm.BatchedReply[In1, A1] = + new jvm.BatchedReply[In1, A1](input, message.asJava, outcome.asJava) } object BatchedReply { From cf690d28c25d5e0b621d996c2fff70d1f77d94ac Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 10:56:21 +0000 Subject: [PATCH 5/7] Add property-based testing with ScalaCheck - Add comprehensive property-based tests for all data structures - Test roundtrip conversions (asJava/asScala) - Test covariance for BatchedMessage and BatchedReply - Test validation for RetryConfig and CronDailySchedule - Test CronMessage key uniqueness - All 35 tests pass (23 unit + 12 property-based) Co-authored-by: alexandru <11753+alexandru@users.noreply.github.com> --- .../scala/DataStructuresPropertySpec.scala | 184 ++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala new file mode 100644 index 0000000..3f1e272 --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala @@ -0,0 +1,184 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import munit.ScalaCheckSuite +import org.scalacheck.Prop.* +import org.scalacheck.{Arbitrary, Gen} +import scala.concurrent.duration.* +import java.time.{Instant, LocalTime, ZoneId} +import org.funfix.delayedqueue.scala.ScheduledMessage.asScala as scheduledAsScala +import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala as timeConfigAsScala +import org.funfix.delayedqueue.scala.MessageId.asScala as messageIdAsScala +import org.funfix.delayedqueue.scala.OfferOutcome.asScala as offerOutcomeAsScala + +class DataStructuresPropertySpec extends ScalaCheckSuite { + + // Generators for our data types + implicit val arbFiniteDuration: Arbitrary[FiniteDuration] = Arbitrary( + Gen.choose(0L, 1000000L).map(millis => FiniteDuration(millis, MILLISECONDS)) + ) + + implicit val arbInstant: Arbitrary[Instant] = Arbitrary( + Gen.choose(0L, System.currentTimeMillis() * 2).map(Instant.ofEpochMilli) + ) + + implicit val arbLocalTime: Arbitrary[LocalTime] = Arbitrary( + for { + hour <- Gen.choose(0, 23) + minute <- Gen.choose(0, 59) + } yield LocalTime.of(hour, minute) + ) + + implicit val arbZoneId: Arbitrary[ZoneId] = Arbitrary( + Gen.oneOf(ZoneId.of("UTC"), ZoneId.of("America/New_York"), ZoneId.of("Europe/London")) + ) + + property("ScheduledMessage asJava/asScala roundtrip preserves data") { + forAll { (key: String, payload: String, instant: Instant, canUpdate: Boolean) => + val original = ScheduledMessage(key, payload, instant, canUpdate) + val roundtripped = original.asJava.scheduledAsScala + + assertEquals(roundtripped.key, original.key) + assertEquals(roundtripped.payload, original.payload) + assertEquals(roundtripped.scheduleAt, original.scheduleAt) + assertEquals(roundtripped.canUpdate, original.canUpdate) + } + } + + property("DelayedQueueTimeConfig asJava/asScala roundtrip preserves data") { + forAll { (acquireTimeout: FiniteDuration, pollPeriod: FiniteDuration) => + val original = DelayedQueueTimeConfig(acquireTimeout, pollPeriod) + val roundtripped = original.asJava.timeConfigAsScala + + assertEquals(roundtripped.acquireTimeout.toMillis, original.acquireTimeout.toMillis) + assertEquals(roundtripped.pollPeriod.toMillis, original.pollPeriod.toMillis) + } + } + + property("MessageId is symmetric") { + forAll { (value: String) => + val messageId = MessageId(value) + assertEquals(messageId.value, value) + assertEquals(messageId.asJava.messageIdAsScala.value, value) + } + } + + property("CronConfigHash fromString is deterministic") { + forAll { (input: String) => + val hash1 = CronConfigHash.fromString(input) + val hash2 = CronConfigHash.fromString(input) + assertEquals(hash1.value, hash2.value) + } + } + + property("CronMessage key is unique for different times") { + forAll { (instant1: Instant, instant2: Instant) => + if instant1 != instant2 then { + val hash = CronConfigHash.fromString("test") + val prefix = "test-prefix" + val key1 = CronMessage.key(hash, prefix, instant1) + val key2 = CronMessage.key(hash, prefix, instant2) + assertNotEquals(key1, key2) + } + } + } + + property("BatchedMessage covariance works correctly") { + forAll { (input: Int, payload: String, instant: Instant) => + val message = ScheduledMessage(s"key-$input", payload, instant) + val batched: BatchedMessage[Int, String] = BatchedMessage(input, message) + // Covariance allows us to upcast + val widened: BatchedMessage[Any, Any] = batched + assertEquals(widened.input, input) + } + } + + property("BatchedReply covariance works correctly") { + forAll { (input: Int, payload: String, instant: Instant) => + val message = ScheduledMessage(s"key-$input", payload, instant) + val reply: BatchedReply[Int, String] = BatchedReply(input, message, OfferOutcome.Created) + // Covariance allows us to upcast + val widened: BatchedReply[Any, Any] = reply + assertEquals(widened.input, input) + } + } + + property("CronDailySchedule getNextTimes always returns at least one time") { + forAll { (hours: List[LocalTime], now: Instant, zoneId: ZoneId) => + if hours.nonEmpty then { + val schedule = CronDailySchedule( + zoneId = zoneId, + hoursOfDay = hours, + scheduleInAdvance = java.time.Duration.ofDays(1), + scheduleInterval = java.time.Duration.ofHours(1) + ) + val nextTimes = schedule.getNextTimes(now) + assert(nextTimes.nonEmpty, "getNextTimes should return at least one time") + assert(nextTimes.head.isAfter(now) || nextTimes.head == now, "First time should be >= now") + } + } + } + + property("OfferOutcome asJava/asScala roundtrip") { + forAll(Gen.oneOf(OfferOutcome.Created, OfferOutcome.Updated, OfferOutcome.Ignored)) { + outcome => + val roundtripped = outcome.asJava.offerOutcomeAsScala + assertEquals(roundtripped, outcome) + } + } + + property("DeliveryType asJava/asScala roundtrip") { + import org.funfix.delayedqueue.scala.DeliveryType.asScala + forAll(Gen.oneOf(DeliveryType.FIRST_DELIVERY, DeliveryType.REDELIVERY)) { deliveryType => + val roundtripped = deliveryType.asJava.asScala + assertEquals(roundtripped, deliveryType) + } + } + + property("RetryConfig validates backoffFactor >= 1.0") { + forAll { (backoffFactor: Double) => + // Simplified test: just check that invalid backoff factors are rejected + if backoffFactor < 1.0 && !backoffFactor.isNaN && !backoffFactor.isInfinite then { + val _ = intercept[IllegalArgumentException] { + RetryConfig( + initialDelay = java.time.Duration.ofMillis(100), + maxDelay = java.time.Duration.ofMillis(1000), + backoffFactor = backoffFactor + ) + } + () + } else if backoffFactor >= 1.0 && !backoffFactor.isNaN && !backoffFactor.isInfinite then { + val config = RetryConfig( + initialDelay = java.time.Duration.ofMillis(100), + maxDelay = java.time.Duration.ofMillis(1000), + backoffFactor = backoffFactor + ) + assert(config.backoffFactor >= 1.0) + } + } + } + + property("JdbcDriver fromClassName is case-insensitive") { + forAll(Gen.oneOf(JdbcDriver.entries)) { driver => + val lower = JdbcDriver.fromClassName(driver.className.toLowerCase) + val upper = JdbcDriver.fromClassName(driver.className.toUpperCase) + assertEquals(lower, Some(driver)) + assertEquals(upper, Some(driver)) + } + } +} From a2d9abda5b2eb48d063823d5acdfe58400e4bcb3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 11:46:35 +0000 Subject: [PATCH 6/7] Refactor: delegate implementations to JVM, use idiomatic Scala patterns Major changes per review feedback: - DeliveryType enum: Use Scala naming (FirstDelivery, Redelivery) - CronConfigHash: Changed to opaque type, delegate to JVM implementations - All factory methods delegate to JVM (fromDailyCron, fromPeriodicTick, fromString) - CronDailySchedule.getNextTimes delegates to JVM - CronMessage.key and staticPayload delegate to JVM - DelayedQueueTimeConfig defaults use JVM constants - JdbcConnectionConfig: username/password now Option - JdbcDatabasePoolConfig: Removed default parameters, added DEFAULT value - JdbcDriver: Private constructor, use JVM as source of truth with jvmEntries map - Test generators moved to common Generators.scala file - Removed fromClassName test as requested - All 34 tests pass Co-authored-by: alexandru <11753+alexandru@users.noreply.github.com> --- .../delayedqueue/scala/AckEnvelope.scala | 12 ++-- .../delayedqueue/scala/CronConfigHash.scala | 56 +++++++------------ .../scala/CronDailySchedule.scala | 46 +-------------- .../delayedqueue/scala/CronMessage.scala | 16 ++---- .../scala/DelayedQueueTimeConfig.scala | 15 +---- .../scala/JdbcConnectionConfig.scala | 20 +++---- .../scala/JdbcDatabasePoolConfig.scala | 20 ++++--- .../delayedqueue/scala/JdbcDriver.scala | 45 ++++++++------- .../scala/DataStructuresPropertySpec.scala | 28 ++-------- .../delayedqueue/scala/Generators.scala | 44 +++++++++++++++ .../delayedqueue/scala/JdbcDriverSpec.scala | 6 -- 11 files changed, 132 insertions(+), 176 deletions(-) create mode 100644 delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala index 65df4af..80ebc8a 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/AckEnvelope.scala @@ -111,15 +111,15 @@ object MessageId { enum DeliveryType { /** Message is being delivered for the first time. */ - case FIRST_DELIVERY + case FirstDelivery /** Message is being redelivered (was scheduled again after initial delivery). */ - case REDELIVERY + case Redelivery /** Converts this Scala DeliveryType to a JVM DeliveryType. */ def asJava: jvm.DeliveryType = this match { - case FIRST_DELIVERY => jvm.DeliveryType.FIRST_DELIVERY - case REDELIVERY => jvm.DeliveryType.REDELIVERY + case FirstDelivery => jvm.DeliveryType.FIRST_DELIVERY + case Redelivery => jvm.DeliveryType.REDELIVERY } } @@ -130,8 +130,8 @@ object DeliveryType { /** Converts a JVM DeliveryType to a Scala DeliveryType. */ def asScala: DeliveryType = javaType match { - case jvm.DeliveryType.FIRST_DELIVERY => DeliveryType.FIRST_DELIVERY - case jvm.DeliveryType.REDELIVERY => DeliveryType.REDELIVERY + case jvm.DeliveryType.FIRST_DELIVERY => DeliveryType.FirstDelivery + case jvm.DeliveryType.REDELIVERY => DeliveryType.Redelivery } } } diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala index 383ac16..44957a8 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronConfigHash.scala @@ -16,7 +16,6 @@ package org.funfix.delayedqueue.scala -import java.security.MessageDigest import org.funfix.delayedqueue.jvm /** Hash of a cron configuration, used to detect configuration changes. @@ -24,49 +23,36 @@ import org.funfix.delayedqueue.jvm * When a cron schedule is installed, this hash is used to identify messages belonging to that * configuration. If the configuration changes, the hash will differ, allowing the system to clean * up old scheduled messages. - * - * @param value - * the MD5 hash string */ -final case class CronConfigHash(value: String) { - override def toString: String = value - - /** Converts this Scala CronConfigHash to a JVM CronConfigHash. */ - def asJava: jvm.CronConfigHash = - new jvm.CronConfigHash(value) -} +opaque type CronConfigHash = String object CronConfigHash { - /** Creates a ConfigHash from a daily cron schedule configuration. */ - def fromDailyCron(config: CronDailySchedule): CronConfigHash = { - // Port from Kotlin: buildString { appendLine(); appendLine("daily-cron:"); ... } - val text = new StringBuilder() - text.append('\n') // Leading newline to match Kotlin - text.append("daily-cron:\n") - text.append(s" zone: ${config.zoneId}\n") - text.append(s" hours: ${config.hoursOfDay.mkString(", ")}\n") - CronConfigHash(md5(text.toString)) + /** Creates a CronConfigHash from a String value. */ + def apply(value: String): CronConfigHash = value + + /** Conversion extension for CronConfigHash. */ + extension (hash: CronConfigHash) { + + /** Gets the string value of the CronConfigHash. */ + def value: String = hash + + /** Converts this Scala CronConfigHash to a JVM CronConfigHash. */ + def asJava: jvm.CronConfigHash = + new jvm.CronConfigHash(hash) } + /** Creates a ConfigHash from a daily cron schedule configuration. */ + def fromDailyCron(config: CronDailySchedule): CronConfigHash = + jvm.CronConfigHash.fromDailyCron(config.asJava).asScala + /** Creates a ConfigHash from a periodic tick configuration. */ - def fromPeriodicTick(period: java.time.Duration): CronConfigHash = { - // Port from Kotlin: buildString { appendLine(); appendLine("periodic-tick:"); ... } - val text = new StringBuilder() - text.append('\n') // Leading newline to match Kotlin - text.append("periodic-tick:\n") - text.append(s" period-ms: ${period.toMillis}\n") - CronConfigHash(md5(text.toString)) - } + def fromPeriodicTick(period: java.time.Duration): CronConfigHash = + jvm.CronConfigHash.fromPeriodicTick(period).asScala /** Creates a ConfigHash from an arbitrary string. */ - def fromString(text: String): CronConfigHash = CronConfigHash(md5(text)) - - private def md5(input: String): String = { - val md = MessageDigest.getInstance("MD5") - val digest = md.digest(input.getBytes) - digest.map("%02x".format(_)).mkString - } + def fromString(text: String): CronConfigHash = + jvm.CronConfigHash.fromString(text).asScala /** Conversion extension for JVM CronConfigHash. */ extension (javaHash: jvm.CronConfigHash) { diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala index 9262ca0..4849198 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronDailySchedule.scala @@ -60,50 +60,8 @@ final case class CronDailySchedule( * list of future instants when messages should be scheduled (never empty) */ def getNextTimes(now: Instant): List[Instant] = { - // Port from Kotlin implementation to match behavior exactly - val until = now.plus(scheduleInAdvance) - val sortedHours = hoursOfDay.sortBy(identity) - val result = scala.collection.mutable.ArrayBuffer[Instant]() - - var currentTime = now - var nextTime = getNextTime(currentTime, sortedHours) - - // Always add the first nextTime (matches NonEmptyList behavior from Scala) - result += nextTime - - // Then add more if they're within the window - var continue = true - while continue do { - currentTime = nextTime - nextTime = getNextTime(currentTime, sortedHours) - if nextTime.isAfter(until) then { - continue = false - } else { - result += nextTime - } - } - - result.toList - } - - private def getNextTime(now: Instant, sortedHours: List[LocalTime]): Instant = { - val zonedDateTime = now.atZone(zoneId) - val localNow = zonedDateTime.toLocalTime - - // Find the next hour today - val nextHourToday = sortedHours.find(_.isAfter(localNow)) - - nextHourToday match { - case Some(nextHour) => - // Schedule for today - nextHour.atDate(zonedDateTime.toLocalDate).atZone(zoneId).toInstant - case None => - // Schedule for tomorrow (first hour of the day) - sortedHours.head - .atDate(zonedDateTime.toLocalDate.plusDays(1)) - .atZone(zoneId) - .toInstant - } + import scala.jdk.CollectionConverters.* + asJava.getNextTimes(now).asScala.toList } /** Converts this Scala CronDailySchedule to a JVM CronDailySchedule. */ diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala index 4557ec1..987964e 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/CronMessage.scala @@ -70,14 +70,6 @@ final case class CronMessage[+A]( } object CronMessage { - private val CRON_DATE_TIME_FORMATTER: DateTimeFormatter = - DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(ZoneOffset.UTC) - private val NANOS_WIDTH = 9 - - private def formatTimestamp(scheduleAt: Instant): String = { - val nanos = String.format(Locale.ROOT, s"%0${NANOS_WIDTH}d", scheduleAt.getNano: Integer) - s"${CRON_DATE_TIME_FORMATTER.format(scheduleAt)}.$nanos" - } /** Generates a unique key for a cron message. * @@ -91,7 +83,7 @@ object CronMessage { * a unique key string */ def key(configHash: CronConfigHash, keyPrefix: String, scheduleAt: Instant): String = - s"$keyPrefix/${configHash.value}/${formatTimestamp(scheduleAt)}" + jvm.CronMessage.key(configHash.asJava, keyPrefix, scheduleAt) /** Creates a factory function that produces CronMessages with a static payload. * @@ -100,8 +92,10 @@ object CronMessage { * @return * a function that creates CronMessages for any given instant */ - def staticPayload[A](payload: A): CronMessageGenerator[A] = - (scheduleAt: Instant) => CronMessage(payload, scheduleAt) + def staticPayload[A](payload: A): CronMessageGenerator[A] = { + val jvmGenerator = jvm.CronMessage.staticPayload(payload) + (scheduleAt: Instant) => jvmGenerator.invoke(scheduleAt).asScala + } /** Conversion extension for JVM CronMessage. */ extension [A](javaMsg: jvm.CronMessage[A]) { diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala index 03691ab..bb34909 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueTimeConfig.scala @@ -43,27 +43,18 @@ object DelayedQueueTimeConfig { /** Default configuration for DelayedQueueInMemory. */ val DEFAULT_IN_MEMORY: DelayedQueueTimeConfig = - DelayedQueueTimeConfig( - acquireTimeout = FiniteDuration(5, scala.concurrent.duration.MINUTES), - pollPeriod = FiniteDuration(500, scala.concurrent.duration.MILLISECONDS) - ) + jvm.DelayedQueueTimeConfig.DEFAULT_IN_MEMORY.asScala /** Default configuration for JDBC-based implementations, with longer acquire timeouts and poll * periods to reduce database load in production environments. */ val DEFAULT_JDBC: DelayedQueueTimeConfig = - DelayedQueueTimeConfig( - acquireTimeout = FiniteDuration(5, scala.concurrent.duration.MINUTES), - pollPeriod = FiniteDuration(3, scala.concurrent.duration.SECONDS) - ) + jvm.DelayedQueueTimeConfig.DEFAULT_JDBC.asScala /** Default configuration for testing, with shorter timeouts and poll periods to speed up tests. */ val DEFAULT_TESTING: DelayedQueueTimeConfig = - DelayedQueueTimeConfig( - acquireTimeout = FiniteDuration(30, scala.concurrent.duration.SECONDS), - pollPeriod = FiniteDuration(100, scala.concurrent.duration.MILLISECONDS) - ) + jvm.DelayedQueueTimeConfig.DEFAULT_TESTING.asScala /** Conversion extension for JVM DelayedQueueTimeConfig. */ extension (javaConfig: jvm.DelayedQueueTimeConfig) { diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala index 8abd128..817c081 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcConnectionConfig.scala @@ -34,9 +34,9 @@ import org.funfix.delayedqueue.jvm final case class JdbcConnectionConfig( url: String, driver: JdbcDriver, - username: String | Null = null, - password: String | Null = null, - pool: JdbcDatabasePoolConfig | Null = null + username: Option[String] = None, + password: Option[String] = None, + pool: Option[JdbcDatabasePoolConfig] = None ) { /** Converts this Scala JdbcConnectionConfig to a JVM JdbcConnectionConfig. */ @@ -44,9 +44,9 @@ final case class JdbcConnectionConfig( new jvm.JdbcConnectionConfig( url, driver.asJava, - username, - password, - if pool == null then null else pool.asJava + username.getOrElse(null), + password.getOrElse(null), + pool.map(_.asJava).getOrElse(null) ) } @@ -60,11 +60,9 @@ object JdbcConnectionConfig { JdbcConnectionConfig( url = javaConfig.url, driver = JdbcDriver.asScala(javaConfig.driver), - username = javaConfig.username, - password = javaConfig.password, - pool = - if javaConfig.pool == null then null - else JdbcDatabasePoolConfig.asScala(javaConfig.pool) + username = Option(javaConfig.username), + password = Option(javaConfig.password), + pool = Option(javaConfig.pool).map(JdbcDatabasePoolConfig.asScala) ) } } diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala index cd60f34..1ac2a33 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDatabasePoolConfig.scala @@ -39,14 +39,14 @@ import org.funfix.delayedqueue.jvm * time to wait for pool initialization */ final case class JdbcDatabasePoolConfig( - connectionTimeout: Duration = Duration.ofSeconds(30), - idleTimeout: Duration = Duration.ofMinutes(10), - maxLifetime: Duration = Duration.ofMinutes(30), - keepaliveTime: Duration = Duration.ZERO, - maximumPoolSize: Int = 10, - minimumIdle: Option[Int] = None, - leakDetectionThreshold: Option[Duration] = None, - initializationFailTimeout: Option[Duration] = None + connectionTimeout: Duration, + idleTimeout: Duration, + maxLifetime: Duration, + keepaliveTime: Duration, + maximumPoolSize: Int, + minimumIdle: Option[Int], + leakDetectionThreshold: Option[Duration], + initializationFailTimeout: Option[Duration] ) { /** Converts this Scala JdbcDatabasePoolConfig to a JVM JdbcDatabasePoolConfig. */ @@ -65,6 +65,10 @@ final case class JdbcDatabasePoolConfig( object JdbcDatabasePoolConfig { + /** Default connection pool configuration. */ + val DEFAULT: JdbcDatabasePoolConfig = + new jvm.JdbcDatabasePoolConfig().asScala + /** Conversion extension for JVM JdbcDatabasePoolConfig. */ extension (javaConfig: jvm.JdbcDatabasePoolConfig) { diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala index 80b7fe5..3fb5bb0 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/JdbcDriver.scala @@ -23,34 +23,41 @@ import org.funfix.delayedqueue.jvm * @param className * the JDBC driver class name */ -final case class JdbcDriver(className: String) { +final case class JdbcDriver private (className: String) { /** Converts this Scala JdbcDriver to a JVM JdbcDriver. */ - def asJava: jvm.JdbcDriver = { - // Find the corresponding JVM driver by class name - import scala.jdk.CollectionConverters.* - val jvmEntries = jvm.JdbcDriver.getEntries.asScala - jvmEntries - .find(d => d.getClassName() == className) - .getOrElse { - throw new IllegalArgumentException(s"Unknown JDBC driver class name: $className") - } - } + def asJava: jvm.JdbcDriver = + JdbcDriver.jvmEntries.getOrElse( + this, + throw new IllegalArgumentException(s"Unknown JDBC driver: $className") + ) } object JdbcDriver { - val HSQLDB: JdbcDriver = JdbcDriver("org.hsqldb.jdbc.JDBCDriver") - val H2: JdbcDriver = JdbcDriver("org.h2.Driver") - val MsSqlServer: JdbcDriver = JdbcDriver("com.microsoft.sqlserver.jdbc.SQLServerDriver") - val Sqlite: JdbcDriver = JdbcDriver("org.sqlite.JDBC") - val MariaDB: JdbcDriver = JdbcDriver("org.mariadb.jdbc.Driver") - val MySQL: JdbcDriver = JdbcDriver("com.mysql.cj.jdbc.Driver") - val PostgreSQL: JdbcDriver = JdbcDriver("org.postgresql.Driver") - val Oracle: JdbcDriver = JdbcDriver("oracle.jdbc.OracleDriver") + + val HSQLDB: JdbcDriver = jvm.JdbcDriver.HSQLDB.asScala + val H2: JdbcDriver = jvm.JdbcDriver.H2.asScala + val MsSqlServer: JdbcDriver = jvm.JdbcDriver.MsSqlServer.asScala + val Sqlite: JdbcDriver = jvm.JdbcDriver.Sqlite.asScala + val MariaDB: JdbcDriver = jvm.JdbcDriver.MariaDB.asScala + val MySQL: JdbcDriver = jvm.JdbcDriver.MySQL.asScala + val PostgreSQL: JdbcDriver = jvm.JdbcDriver.PostgreSQL.asScala + val Oracle: JdbcDriver = jvm.JdbcDriver.Oracle.asScala val entries: List[JdbcDriver] = List(H2, HSQLDB, MariaDB, MsSqlServer, MySQL, PostgreSQL, Sqlite, Oracle) + private val jvmEntries: Map[JdbcDriver, jvm.JdbcDriver] = Map( + H2 -> jvm.JdbcDriver.H2, + HSQLDB -> jvm.JdbcDriver.HSQLDB, + MariaDB -> jvm.JdbcDriver.MariaDB, + MsSqlServer -> jvm.JdbcDriver.MsSqlServer, + MySQL -> jvm.JdbcDriver.MySQL, + PostgreSQL -> jvm.JdbcDriver.PostgreSQL, + Sqlite -> jvm.JdbcDriver.Sqlite, + Oracle -> jvm.JdbcDriver.Oracle + ) + /** Attempt to find a JdbcDriver by its class name. * * @param className diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala index 3f1e272..88e79ea 100644 --- a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala @@ -18,36 +18,16 @@ package org.funfix.delayedqueue.scala import munit.ScalaCheckSuite import org.scalacheck.Prop.* -import org.scalacheck.{Arbitrary, Gen} -import scala.concurrent.duration.* +import org.scalacheck.Gen +import scala.concurrent.duration.FiniteDuration import java.time.{Instant, LocalTime, ZoneId} import org.funfix.delayedqueue.scala.ScheduledMessage.asScala as scheduledAsScala import org.funfix.delayedqueue.scala.DelayedQueueTimeConfig.asScala as timeConfigAsScala import org.funfix.delayedqueue.scala.MessageId.asScala as messageIdAsScala import org.funfix.delayedqueue.scala.OfferOutcome.asScala as offerOutcomeAsScala +import org.funfix.delayedqueue.scala.Generators.{given, *} class DataStructuresPropertySpec extends ScalaCheckSuite { - - // Generators for our data types - implicit val arbFiniteDuration: Arbitrary[FiniteDuration] = Arbitrary( - Gen.choose(0L, 1000000L).map(millis => FiniteDuration(millis, MILLISECONDS)) - ) - - implicit val arbInstant: Arbitrary[Instant] = Arbitrary( - Gen.choose(0L, System.currentTimeMillis() * 2).map(Instant.ofEpochMilli) - ) - - implicit val arbLocalTime: Arbitrary[LocalTime] = Arbitrary( - for { - hour <- Gen.choose(0, 23) - minute <- Gen.choose(0, 59) - } yield LocalTime.of(hour, minute) - ) - - implicit val arbZoneId: Arbitrary[ZoneId] = Arbitrary( - Gen.oneOf(ZoneId.of("UTC"), ZoneId.of("America/New_York"), ZoneId.of("Europe/London")) - ) - property("ScheduledMessage asJava/asScala roundtrip preserves data") { forAll { (key: String, payload: String, instant: Instant, canUpdate: Boolean) => val original = ScheduledMessage(key, payload, instant, canUpdate) @@ -144,7 +124,7 @@ class DataStructuresPropertySpec extends ScalaCheckSuite { property("DeliveryType asJava/asScala roundtrip") { import org.funfix.delayedqueue.scala.DeliveryType.asScala - forAll(Gen.oneOf(DeliveryType.FIRST_DELIVERY, DeliveryType.REDELIVERY)) { deliveryType => + forAll(Gen.oneOf(DeliveryType.FirstDelivery, DeliveryType.Redelivery)) { deliveryType => val roundtripped = deliveryType.asJava.asScala assertEquals(roundtripped, deliveryType) } diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala new file mode 100644 index 0000000..4da365c --- /dev/null +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/Generators.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2026 Alexandru Nedelcu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.funfix.delayedqueue.scala + +import org.scalacheck.{Arbitrary, Gen} +import scala.concurrent.duration.* +import java.time.{Instant, LocalTime, ZoneId} + +/** Common ScalaCheck generators for DelayedQueue data structures. */ +object Generators { + + implicit val arbFiniteDuration: Arbitrary[FiniteDuration] = Arbitrary( + Gen.choose(0L, 1000000L).map(millis => FiniteDuration(millis, MILLISECONDS)) + ) + + implicit val arbInstant: Arbitrary[Instant] = Arbitrary( + Gen.choose(0L, System.currentTimeMillis() * 2).map(Instant.ofEpochMilli) + ) + + implicit val arbLocalTime: Arbitrary[LocalTime] = Arbitrary( + for { + hour <- Gen.choose(0, 23) + minute <- Gen.choose(0, 59) + } yield LocalTime.of(hour, minute) + ) + + implicit val arbZoneId: Arbitrary[ZoneId] = Arbitrary( + Gen.oneOf(ZoneId.of("UTC"), ZoneId.of("America/New_York"), ZoneId.of("Europe/London")) + ) +} diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala index 15cc2c8..707cb05 100644 --- a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/JdbcDriverSpec.scala @@ -46,12 +46,6 @@ class JdbcDriverSpec extends munit.FunSuite { assertEquals(JdbcDriver.entries.toSet, allDrivers) } - test("fromClassName should find drivers case-insensitively") { - assertEquals(JdbcDriver.fromClassName("org.h2.Driver"), Some(JdbcDriver.H2)) - assertEquals(JdbcDriver.fromClassName("ORG.H2.DRIVER"), Some(JdbcDriver.H2)) - assertEquals(JdbcDriver.fromClassName("unknown"), None) - } - test("asJava and asScala should be symmetric") { JdbcDriver.entries.foreach { driver => val roundtripped = driver.asJava.asScala From 500a2c513f11d7e0f8ad613e4fcc00f97f1bb52c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 15:00:24 +0000 Subject: [PATCH 7/7] Remove default parameters and delegate to JVM defaults - DelayedQueueJDBCConfig: Removed default parameters for ackEnvSource and retryPolicy - RetryConfig: Removed all default parameters (backoffFactor, maxRetries, totalSoftTimeout, perTryHardTimeout) - RetryConfig.DEFAULT and NO_RETRIES now delegate to JVM constants via asScala - Updated tests to provide all required parameters - All 34 tests pass Co-authored-by: alexandru <11753+alexandru@users.noreply.github.com> --- .../scala/DelayedQueueJDBCConfig.scala | 4 +-- .../delayedqueue/scala/RetryConfig.scala | 26 +++++-------------- .../scala/DataStructuresPropertySpec.scala | 12 ++++++--- .../delayedqueue/scala/RetryConfigSpec.scala | 11 ++++++-- 4 files changed, 26 insertions(+), 27 deletions(-) diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala index 89f2ed1..8c64d53 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/DelayedQueueJDBCConfig.scala @@ -63,8 +63,8 @@ final case class DelayedQueueJDBCConfig( tableName: String, time: DelayedQueueTimeConfig, queueName: String, - ackEnvSource: String = "", - retryPolicy: Option[RetryConfig] = None + ackEnvSource: String, + retryPolicy: Option[RetryConfig] ) { require(tableName.nonEmpty, "tableName must not be blank") require(queueName.nonEmpty, "queueName must not be blank") diff --git a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala index 2ef7efa..e967bd9 100644 --- a/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala +++ b/delayedqueue-scala/jvm/src/main/scala/org/funfix/delayedqueue/scala/RetryConfig.scala @@ -53,10 +53,10 @@ import org.funfix.delayedqueue.jvm final case class RetryConfig( initialDelay: Duration, maxDelay: Duration, - backoffFactor: Double = 2.0, - maxRetries: Option[Long] = None, - totalSoftTimeout: Option[Duration] = None, - perTryHardTimeout: Option[Duration] = None + backoffFactor: Double, + maxRetries: Option[Long], + totalSoftTimeout: Option[Duration], + perTryHardTimeout: Option[Duration] ) { require(backoffFactor >= 1.0, s"backoffFactor must be >= 1.0, got $backoffFactor") require(!initialDelay.isNegative, s"initialDelay must not be negative, got $initialDelay") @@ -94,25 +94,11 @@ object RetryConfig { * - 2.0 backoff factor (exponential doubling) */ val DEFAULT: RetryConfig = - RetryConfig( - maxRetries = Some(5), - totalSoftTimeout = Some(Duration.ofSeconds(30)), - perTryHardTimeout = Some(Duration.ofSeconds(10)), - initialDelay = Duration.ofMillis(100), - maxDelay = Duration.ofSeconds(5), - backoffFactor = 2.0 - ) + jvm.RetryConfig.DEFAULT.asScala /** No retries - operations fail immediately on first error. */ val NO_RETRIES: RetryConfig = - RetryConfig( - maxRetries = Some(0), - totalSoftTimeout = None, - perTryHardTimeout = None, - initialDelay = Duration.ZERO, - maxDelay = Duration.ZERO, - backoffFactor = 1.0 - ) + jvm.RetryConfig.NO_RETRIES.asScala /** Conversion extension for JVM RetryConfig. */ extension (javaConfig: jvm.RetryConfig) { diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala index 88e79ea..5858825 100644 --- a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/DataStructuresPropertySpec.scala @@ -138,7 +138,10 @@ class DataStructuresPropertySpec extends ScalaCheckSuite { RetryConfig( initialDelay = java.time.Duration.ofMillis(100), maxDelay = java.time.Duration.ofMillis(1000), - backoffFactor = backoffFactor + backoffFactor = backoffFactor, + maxRetries = None, + totalSoftTimeout = None, + perTryHardTimeout = None ) } () @@ -146,9 +149,12 @@ class DataStructuresPropertySpec extends ScalaCheckSuite { val config = RetryConfig( initialDelay = java.time.Duration.ofMillis(100), maxDelay = java.time.Duration.ofMillis(1000), - backoffFactor = backoffFactor + backoffFactor = backoffFactor, + maxRetries = None, + totalSoftTimeout = None, + perTryHardTimeout = None ) - assert(config.backoffFactor >= 1.0) + assertEquals(config.backoffFactor >= 1.0, true) } } } diff --git a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala index 9485375..055299e 100644 --- a/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala +++ b/delayedqueue-scala/jvm/src/test/scala/org/funfix/delayedqueue/scala/RetryConfigSpec.scala @@ -55,7 +55,10 @@ class RetryConfigSpec extends munit.FunSuite { RetryConfig( initialDelay = Duration.ofMillis(100), maxDelay = Duration.ofSeconds(5), - backoffFactor = 0.5 + backoffFactor = 0.5, + maxRetries = None, + totalSoftTimeout = None, + perTryHardTimeout = None ) } } @@ -64,7 +67,11 @@ class RetryConfigSpec extends munit.FunSuite { intercept[IllegalArgumentException] { RetryConfig( initialDelay = Duration.ofMillis(-100), - maxDelay = Duration.ofSeconds(5) + maxDelay = Duration.ofSeconds(5), + backoffFactor = 2.0, + maxRetries = None, + totalSoftTimeout = None, + perTryHardTimeout = None ) } }