Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ lazy val delayedqueue = crossProject(JVMPlatform)
// Testing
"org.scalameta" %% "munit" % "1.0.4" % Test,
"org.typelevel" %% "munit-cats-effect" % "2.1.0" % Test,
"org.scalacheck" %% "scalacheck" % "1.19.0" % Test,
"org.scalameta" %% "munit-scalacheck" % "1.2.0" % Test,
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright 2026 Alexandru Nedelcu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.funfix.delayedqueue.scala

import cats.effect.IO
import java.time.Instant
import org.funfix.delayedqueue.jvm

/** Message envelope that includes an acknowledgment callback.
*
* This wrapper is returned when polling messages from the queue. It contains the message payload
* plus metadata and an acknowledgment function that should be called after processing completes.
*
* This type is not serializable.
*
* ==Example==
*
* {{{
* for {
* envelope <- queue.poll
* _ <- processMessage(envelope.payload)
* _ <- envelope.acknowledge
* } yield ()
* }}}
*
* @tparam A
* the type of the message payload
* @param payload
* the actual message content
* @param messageId
* unique identifier for tracking this message
* @param timestamp
* when this envelope was created (poll time)
* @param source
* identifier for the queue or source system
* @param deliveryType
* indicates whether this is the first delivery or a redelivery
* @param acknowledge
* IO action to call to acknowledge successful processing, and delete the message from the queue
*/
final case class AckEnvelope[+A](
payload: A,
messageId: MessageId,
timestamp: Instant,
source: String,
deliveryType: DeliveryType,
acknowledge: IO[Unit]
)

object AckEnvelope {

/** Conversion extension for JVM AckEnvelope. */
extension [A](javaEnv: jvm.AckEnvelope[A]) {

/** Converts a JVM AckEnvelope to a Scala AckEnvelope. */
def asScala: AckEnvelope[A] =
AckEnvelope(
payload = javaEnv.payload,
messageId = MessageId.asScala(javaEnv.messageId),
timestamp = javaEnv.timestamp,
source = javaEnv.source,
deliveryType = DeliveryType.asScala(javaEnv.deliveryType),
acknowledge = IO.blocking(javaEnv.acknowledge())
)
}
}

/** Unique identifier for a message. */
opaque type MessageId = String

object MessageId {

/** Creates a MessageId from a String value. */
def apply(value: String): MessageId = value

/** Conversion extension for String to MessageId. */
extension (id: MessageId) {

/** Gets the string value of the MessageId. */
def value: String = id

/** Converts this Scala MessageId to a JVM MessageId. */
def asJava: jvm.MessageId =
new jvm.MessageId(id)
}

/** Conversion extension for JVM MessageId. */
extension (javaId: jvm.MessageId) {

/** Converts a JVM MessageId to a Scala MessageId. */
def asScala: MessageId =
MessageId(javaId.value)
}
}

/** Indicates whether a message is being delivered for the first time or redelivered. */
enum DeliveryType {

/** Message is being delivered for the first time. */
case FirstDelivery

/** Message is being redelivered (was scheduled again after initial delivery). */
case Redelivery

/** Converts this Scala DeliveryType to a JVM DeliveryType. */
def asJava: jvm.DeliveryType = this match {
case FirstDelivery => jvm.DeliveryType.FIRST_DELIVERY
case Redelivery => jvm.DeliveryType.REDELIVERY
}
}

object DeliveryType {

/** Conversion extension for JVM DeliveryType. */
extension (javaType: jvm.DeliveryType) {

/** Converts a JVM DeliveryType to a Scala DeliveryType. */
def asScala: DeliveryType = javaType match {
case jvm.DeliveryType.FIRST_DELIVERY => DeliveryType.FirstDelivery
case jvm.DeliveryType.REDELIVERY => DeliveryType.Redelivery
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2026 Alexandru Nedelcu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.funfix.delayedqueue.scala

import org.funfix.delayedqueue.jvm

/** Hash of a cron configuration, used to detect configuration changes.
*
* When a cron schedule is installed, this hash is used to identify messages belonging to that
* configuration. If the configuration changes, the hash will differ, allowing the system to clean
* up old scheduled messages.
*/
opaque type CronConfigHash = String

object CronConfigHash {

/** Creates a CronConfigHash from a String value. */
def apply(value: String): CronConfigHash = value

/** Conversion extension for CronConfigHash. */
extension (hash: CronConfigHash) {

/** Gets the string value of the CronConfigHash. */
def value: String = hash

/** Converts this Scala CronConfigHash to a JVM CronConfigHash. */
def asJava: jvm.CronConfigHash =
new jvm.CronConfigHash(hash)
}

/** Creates a ConfigHash from a daily cron schedule configuration. */
def fromDailyCron(config: CronDailySchedule): CronConfigHash =
jvm.CronConfigHash.fromDailyCron(config.asJava).asScala

/** Creates a ConfigHash from a periodic tick configuration. */
def fromPeriodicTick(period: java.time.Duration): CronConfigHash =
jvm.CronConfigHash.fromPeriodicTick(period).asScala

/** Creates a ConfigHash from an arbitrary string. */
def fromString(text: String): CronConfigHash =
jvm.CronConfigHash.fromString(text).asScala

/** Conversion extension for JVM CronConfigHash. */
extension (javaHash: jvm.CronConfigHash) {

/** Converts a JVM CronConfigHash to a Scala CronConfigHash. */
def asScala: CronConfigHash =
CronConfigHash(javaHash.value)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2026 Alexandru Nedelcu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.funfix.delayedqueue.scala

import java.time.Duration
import java.time.Instant
import java.time.LocalTime
import java.time.ZoneId
import org.funfix.delayedqueue.jvm
import scala.jdk.CollectionConverters.*

/** Configuration for daily recurring scheduled messages with timezone support.
*
* This class defines when messages should be scheduled each day, with support for multiple times
* per day and scheduling messages in advance.
*
* @param zoneId
* the timezone for interpreting the hours of day
* @param hoursOfDay
* the times during each day when messages should be scheduled (must not be empty)
* @param scheduleInAdvance
* how far in advance to schedule messages
* @param scheduleInterval
* how often to check and update the schedule
*/
final case class CronDailySchedule(
zoneId: ZoneId,
hoursOfDay: List[LocalTime],
scheduleInAdvance: Duration,
scheduleInterval: Duration
) {
require(hoursOfDay.nonEmpty, "hoursOfDay must not be empty")
require(
!scheduleInterval.isZero && !scheduleInterval.isNegative,
"scheduleInterval must be positive"
)

/** Calculates the next scheduled times starting from now.
*
* Returns all times that should be scheduled, from now until (now + scheduleInAdvance). Always
* returns at least one time (the next scheduled time), even if it's beyond scheduleInAdvance.
*
* @param now
* the current time
* @return
* list of future instants when messages should be scheduled (never empty)
*/
def getNextTimes(now: Instant): List[Instant] = {
import scala.jdk.CollectionConverters.*
asJava.getNextTimes(now).asScala.toList
}

/** Converts this Scala CronDailySchedule to a JVM CronDailySchedule. */
def asJava: jvm.CronDailySchedule =
new jvm.CronDailySchedule(
zoneId,
hoursOfDay.asJava,
scheduleInAdvance,
scheduleInterval
)
}

object CronDailySchedule {

/** Creates a DailyCronSchedule with the specified configuration. */
def create(
zoneId: ZoneId,
hoursOfDay: List[LocalTime],
scheduleInAdvance: Duration,
scheduleInterval: Duration
): CronDailySchedule =
CronDailySchedule(zoneId, hoursOfDay, scheduleInAdvance, scheduleInterval)

/** Conversion extension for JVM CronDailySchedule. */
extension (javaSchedule: jvm.CronDailySchedule) {

/** Converts a JVM CronDailySchedule to a Scala CronDailySchedule. */
def asScala: CronDailySchedule =
CronDailySchedule(
zoneId = javaSchedule.zoneId,
hoursOfDay = javaSchedule.hoursOfDay.asScala.toList,
scheduleInAdvance = javaSchedule.scheduleInAdvance,
scheduleInterval = javaSchedule.scheduleInterval
)
}
}
Loading