Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion delayedqueue-jvm/api/delayedqueue-jvm.api
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public final class org/funfix/delayedqueue/jvm/OfferOutcome$Updated : org/funfix
public fun toString ()Ljava/lang/String;
}

public final class org/funfix/delayedqueue/jvm/ResourceUnavailableException : java/lang/Exception {
public final class org/funfix/delayedqueue/jvm/ResourceUnavailableException : java/io/IOException {
public fun <init> (Ljava/lang/String;Ljava/lang/Throwable;)V
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.funfix.delayedqueue.jvm

import java.security.MessageDigest
import java.sql.SQLException
import java.time.Clock
import java.time.Instant
import java.util.UUID
Expand Down Expand Up @@ -45,8 +44,6 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SqliteMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.withConnection
import org.funfix.delayedqueue.jvm.internals.jdbc.withDbRetries
import org.funfix.delayedqueue.jvm.internals.jdbc.withTransaction
import org.funfix.delayedqueue.jvm.internals.utils.Raise
import org.funfix.delayedqueue.jvm.internals.utils.unsafeSneakyRaises
import org.slf4j.LoggerFactory

/**
Expand Down Expand Up @@ -118,14 +115,9 @@ private constructor(
* This method has Raise context for ResourceUnavailableException and InterruptedException,
* which matches what the public API declares via @Throws.
*/
context(_: Raise<ResourceUnavailableException>, _: Raise<InterruptedException>)
private fun <T> withRetries(
block:
context(Raise<SQLException>, Raise<InterruptedException>)
() -> T
): T {
private fun <T> withRetries(block: () -> T): T {
return if (config.retryPolicy == null) {
block(Raise._PRIVATE_AND_UNSAFE, Raise._PRIVATE_AND_UNSAFE)
block()
} else {
withDbRetries(
config = config.retryPolicy,
Expand All @@ -138,17 +130,16 @@ private constructor(

@Throws(ResourceUnavailableException::class, InterruptedException::class)
override fun offerOrUpdate(key: String, payload: A, scheduleAt: Instant): OfferOutcome =
unsafeSneakyRaises {
withRetries { offer(key, payload, scheduleAt, canUpdate = true) }
withRetries {
offer(key, payload, scheduleAt, canUpdate = true)
}

@Throws(ResourceUnavailableException::class, InterruptedException::class)
override fun offerIfNotExists(key: String, payload: A, scheduleAt: Instant): OfferOutcome =
unsafeSneakyRaises {
withRetries { offer(key, payload, scheduleAt, canUpdate = false) }
withRetries {
offer(key, payload, scheduleAt, canUpdate = false)
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
private fun offer(
key: String,
payload: A,
Expand Down Expand Up @@ -232,11 +223,10 @@ private constructor(

@Throws(ResourceUnavailableException::class, InterruptedException::class)
override fun <In> offerBatch(messages: List<BatchedMessage<In, A>>): List<BatchedReply<In, A>> =
unsafeSneakyRaises {
withRetries { offerBatchImpl(messages) }
withRetries {
offerBatchImpl(messages)
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
private fun <In> offerBatchImpl(
messages: List<BatchedMessage<In, A>>
): List<BatchedReply<In, A>> {
Expand Down Expand Up @@ -344,25 +334,20 @@ private constructor(
}

@Throws(ResourceUnavailableException::class, InterruptedException::class)
override fun tryPoll(): AckEnvelope<A>? = unsafeSneakyRaises { withRetries { tryPollImpl() } }
override fun tryPoll(): AckEnvelope<A>? = withRetries { tryPollImpl() }

private fun acknowledgeByLockUuid(lockUuid: String): AcknowledgeFun = {
unsafeSneakyRaises {
withRetries {
database.withTransaction { conn -> adapter.deleteRowsWithLock(conn, lockUuid) }
}
withRetries {
database.withTransaction { conn -> adapter.deleteRowsWithLock(conn, lockUuid) }
}
}

private fun acknowledgeByFingerprint(row: DBTableRowWithId): AcknowledgeFun = {
unsafeSneakyRaises {
withRetries {
database.withTransaction { conn -> adapter.deleteRowByFingerprint(conn, row) }
}
withRetries {
database.withTransaction { conn -> adapter.deleteRowByFingerprint(conn, row) }
}
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
private fun tryPollImpl(): AckEnvelope<A>? {
// Retry loop to handle failed acquires (concurrent modifications)
// This matches the original Scala implementation which retries if acquire fails
Expand Down Expand Up @@ -422,11 +407,10 @@ private constructor(
}

@Throws(ResourceUnavailableException::class, InterruptedException::class)
override fun tryPollMany(batchMaxSize: Int): AckEnvelope<List<A>> = unsafeSneakyRaises {
withRetries { tryPollManyImpl(batchMaxSize) }
override fun tryPollMany(batchMaxSize: Int): AckEnvelope<List<A>> = withRetries {
tryPollManyImpl(batchMaxSize)
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
private fun tryPollManyImpl(batchMaxSize: Int): AckEnvelope<List<A>> {
// Handle edge case: non-positive batch size
if (batchMaxSize <= 0) {
Expand Down Expand Up @@ -508,11 +492,8 @@ private constructor(
}

@Throws(ResourceUnavailableException::class, InterruptedException::class)
override fun read(key: String): AckEnvelope<A>? = unsafeSneakyRaises {
withRetries { readImpl(key) }
}
override fun read(key: String): AckEnvelope<A>? = withRetries { readImpl(key) }

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
private fun readImpl(key: String): AckEnvelope<A>? {
return database.withConnection { connection ->
val row = adapter.selectByKey(connection, pKind, key) ?: return@withConnection null
Expand All @@ -539,19 +520,13 @@ private constructor(
}

@Throws(ResourceUnavailableException::class, InterruptedException::class)
override fun dropMessage(key: String): Boolean = unsafeSneakyRaises {
withRetries {
database.withTransaction { connection -> adapter.deleteOneRow(connection, key, pKind) }
}
override fun dropMessage(key: String): Boolean = withRetries {
database.withTransaction { connection -> adapter.deleteOneRow(connection, key, pKind) }
}

@Throws(ResourceUnavailableException::class, InterruptedException::class)
override fun containsMessage(key: String): Boolean = unsafeSneakyRaises {
withRetries {
database.withConnection { connection ->
adapter.checkIfKeyExists(connection, key, pKind)
}
}
override fun containsMessage(key: String): Boolean = withRetries {
database.withConnection { connection -> adapter.checkIfKeyExists(connection, key, pKind) }
}

@Throws(
Expand All @@ -564,12 +539,8 @@ private constructor(
"To drop all messages, you must provide the exact confirmation string"
}

return unsafeSneakyRaises {
withRetries {
database.withTransaction { connection ->
adapter.dropAllMessages(connection, pKind)
}
}
return withRetries {
database.withTransaction { connection -> adapter.dropAllMessages(connection, pKind) }
}
}

Expand Down Expand Up @@ -607,6 +578,8 @@ private constructor(
public companion object {
private val logger = LoggerFactory.getLogger(DelayedQueueJDBC::class.java)

private fun <T> withRetries(block: () -> T): T = block()

/**
* Runs database migrations for the specified configuration.
*
Expand All @@ -619,7 +592,7 @@ private constructor(
*/
@JvmStatic
@Throws(ResourceUnavailableException::class, InterruptedException::class)
public fun runMigrations(config: DelayedQueueJDBCConfig): Unit = unsafeSneakyRaises {
public fun runMigrations(config: DelayedQueueJDBCConfig): Unit = withRetries {
val database = Database(config.db)
database.use {
database.withConnection { connection ->
Expand Down Expand Up @@ -671,7 +644,7 @@ private constructor(
serializer: MessageSerializer<A>,
config: DelayedQueueJDBCConfig,
clock: Clock = Clock.systemUTC(),
): DelayedQueueJDBC<A> = unsafeSneakyRaises {
): DelayedQueueJDBC<A> = withRetries {
val database = Database(config.db)
val adapter = SQLVendorAdapter.create(config.db.driver, config.tableName)
DelayedQueueJDBC(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package org.funfix.delayedqueue.jvm

import java.io.IOException

/**
* Checked exception thrown in case of exceptions happening that are not recoverable, rendering
* DelayedQueue inaccessible.
*
* Example: issues with the RDBMS (bugs, or connection unavailable, failing after multiple retries)
*/
public class ResourceUnavailableException(message: String?, cause: Throwable?) :
Exception(message, cause)
IOException(message, cause)
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ import org.funfix.delayedqueue.jvm.CronPayloadGenerator
import org.funfix.delayedqueue.jvm.CronService
import org.funfix.delayedqueue.jvm.DelayedQueue
import org.funfix.delayedqueue.jvm.ResourceUnavailableException
import org.funfix.delayedqueue.jvm.internals.utils.Raise
import org.funfix.delayedqueue.jvm.internals.utils.runAndRecoverRaised
import org.funfix.delayedqueue.jvm.internals.utils.unsafeSneakyRaises
import org.funfix.delayedqueue.jvm.internals.utils.withTimeout
import org.slf4j.LoggerFactory

Expand All @@ -45,9 +42,7 @@ import org.slf4j.LoggerFactory
* Used by CronServiceImpl to delegate database operations to the DelayedQueue implementation while
* maintaining proper exception flow tracking via Raise context.
*/
internal typealias CronDeleteOperation =
context(Raise<ResourceUnavailableException>, Raise<InterruptedException>)
(CronConfigHash, String) -> Unit
internal typealias CronDeleteOperation = (CronConfigHash, String) -> Unit

/**
* Base implementation of CronService that can be used by both in-memory and JDBC implementations.
Expand All @@ -66,19 +61,17 @@ internal class CronServiceImpl<A>(
keyPrefix: String,
messages: List<CronMessage<A>>,
) {
unsafeSneakyRaises {
installTick0(
configHash = configHash,
keyPrefix = keyPrefix,
messages = messages,
canUpdate = false,
)
}
installTick0(
configHash = configHash,
keyPrefix = keyPrefix,
messages = messages,
canUpdate = false,
)
}

@Throws(ResourceUnavailableException::class, InterruptedException::class)
override fun uninstallTick(configHash: CronConfigHash, keyPrefix: String) {
unsafeSneakyRaises { deleteCurrentCron(configHash, keyPrefix) }
deleteCurrentCron(configHash, keyPrefix)
}

@Throws(ResourceUnavailableException::class, InterruptedException::class)
Expand Down Expand Up @@ -157,7 +150,6 @@ internal class CronServiceImpl<A>(
* @param canUpdate whether to update existing messages (false for installTick, varies for
* install)
*/
context(_: Raise<ResourceUnavailableException>, _: Raise<InterruptedException>)
private fun installTick0(
configHash: CronConfigHash,
keyPrefix: String,
Expand Down Expand Up @@ -208,21 +200,17 @@ internal class CronServiceImpl<A>(

val task = Runnable {
try {
runAndRecoverRaised({
withTimeout(scheduleInterval) {
val now = clock.instant()
val firstRun = isFirst.getAndSet(false)
val messages = generateMany(now)

installTick0(
configHash = configHash,
keyPrefix = keyPrefix,
messages = messages,
canUpdate = firstRun,
)
}
}) { timeout ->
throw timeout
withTimeout(scheduleInterval) {
val now = clock.instant()
val firstRun = isFirst.getAndSet(false)
val messages = generateMany(now)

installTick0(
configHash = configHash,
keyPrefix = keyPrefix,
messages = messages,
canUpdate = firstRun,
)
}
} catch (e: Exception) {
logger.error("Error in cron task for $keyPrefix", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package org.funfix.delayedqueue.jvm.internals.jdbc

import java.sql.SQLException
import org.funfix.delayedqueue.jvm.internals.utils.Raise

/**
* Represents a database migration with SQL and a test to check if it needs to run.
*
Expand Down Expand Up @@ -91,7 +88,6 @@ internal object MigrationRunner {
* @param migrations List of migrations to run
* @return Number of migrations executed
*/
context(_: Raise<InterruptedException>, _: Raise<SQLException>)
fun runMigrations(conn: SafeConnection, migrations: List<Migration>): Int {
var executed = 0
for (migration in migrations) {
Expand Down
Loading