diff --git a/delayedqueue-jvm/api/delayedqueue-jvm.api b/delayedqueue-jvm/api/delayedqueue-jvm.api index 3d37851..a12ef36 100644 --- a/delayedqueue-jvm/api/delayedqueue-jvm.api +++ b/delayedqueue-jvm/api/delayedqueue-jvm.api @@ -399,7 +399,7 @@ public final class org/funfix/delayedqueue/jvm/OfferOutcome$Updated : org/funfix public fun toString ()Ljava/lang/String; } -public final class org/funfix/delayedqueue/jvm/ResourceUnavailableException : java/lang/Exception { +public final class org/funfix/delayedqueue/jvm/ResourceUnavailableException : java/io/IOException { public fun (Ljava/lang/String;Ljava/lang/Throwable;)V } diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt index b035928..6d6ed9e 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt @@ -17,7 +17,6 @@ package org.funfix.delayedqueue.jvm import java.security.MessageDigest -import java.sql.SQLException import java.time.Clock import java.time.Instant import java.util.UUID @@ -45,8 +44,6 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SqliteMigrations import org.funfix.delayedqueue.jvm.internals.jdbc.withConnection import org.funfix.delayedqueue.jvm.internals.jdbc.withDbRetries import org.funfix.delayedqueue.jvm.internals.jdbc.withTransaction -import org.funfix.delayedqueue.jvm.internals.utils.Raise -import org.funfix.delayedqueue.jvm.internals.utils.unsafeSneakyRaises import org.slf4j.LoggerFactory /** @@ -118,14 +115,9 @@ private constructor( * This method has Raise context for ResourceUnavailableException and InterruptedException, * which matches what the public API declares via @Throws. */ - context(_: Raise, _: Raise) - private fun withRetries( - block: - context(Raise, Raise) - () -> T - ): T { + private fun withRetries(block: () -> T): T { return if (config.retryPolicy == null) { - block(Raise._PRIVATE_AND_UNSAFE, Raise._PRIVATE_AND_UNSAFE) + block() } else { withDbRetries( config = config.retryPolicy, @@ -138,17 +130,16 @@ private constructor( @Throws(ResourceUnavailableException::class, InterruptedException::class) override fun offerOrUpdate(key: String, payload: A, scheduleAt: Instant): OfferOutcome = - unsafeSneakyRaises { - withRetries { offer(key, payload, scheduleAt, canUpdate = true) } + withRetries { + offer(key, payload, scheduleAt, canUpdate = true) } @Throws(ResourceUnavailableException::class, InterruptedException::class) override fun offerIfNotExists(key: String, payload: A, scheduleAt: Instant): OfferOutcome = - unsafeSneakyRaises { - withRetries { offer(key, payload, scheduleAt, canUpdate = false) } + withRetries { + offer(key, payload, scheduleAt, canUpdate = false) } - context(_: Raise, _: Raise) private fun offer( key: String, payload: A, @@ -232,11 +223,10 @@ private constructor( @Throws(ResourceUnavailableException::class, InterruptedException::class) override fun offerBatch(messages: List>): List> = - unsafeSneakyRaises { - withRetries { offerBatchImpl(messages) } + withRetries { + offerBatchImpl(messages) } - context(_: Raise, _: Raise) private fun offerBatchImpl( messages: List> ): List> { @@ -344,25 +334,20 @@ private constructor( } @Throws(ResourceUnavailableException::class, InterruptedException::class) - override fun tryPoll(): AckEnvelope? = unsafeSneakyRaises { withRetries { tryPollImpl() } } + override fun tryPoll(): AckEnvelope? = withRetries { tryPollImpl() } private fun acknowledgeByLockUuid(lockUuid: String): AcknowledgeFun = { - unsafeSneakyRaises { - withRetries { - database.withTransaction { conn -> adapter.deleteRowsWithLock(conn, lockUuid) } - } + withRetries { + database.withTransaction { conn -> adapter.deleteRowsWithLock(conn, lockUuid) } } } private fun acknowledgeByFingerprint(row: DBTableRowWithId): AcknowledgeFun = { - unsafeSneakyRaises { - withRetries { - database.withTransaction { conn -> adapter.deleteRowByFingerprint(conn, row) } - } + withRetries { + database.withTransaction { conn -> adapter.deleteRowByFingerprint(conn, row) } } } - context(_: Raise, _: Raise) private fun tryPollImpl(): AckEnvelope? { // Retry loop to handle failed acquires (concurrent modifications) // This matches the original Scala implementation which retries if acquire fails @@ -422,11 +407,10 @@ private constructor( } @Throws(ResourceUnavailableException::class, InterruptedException::class) - override fun tryPollMany(batchMaxSize: Int): AckEnvelope> = unsafeSneakyRaises { - withRetries { tryPollManyImpl(batchMaxSize) } + override fun tryPollMany(batchMaxSize: Int): AckEnvelope> = withRetries { + tryPollManyImpl(batchMaxSize) } - context(_: Raise, _: Raise) private fun tryPollManyImpl(batchMaxSize: Int): AckEnvelope> { // Handle edge case: non-positive batch size if (batchMaxSize <= 0) { @@ -508,11 +492,8 @@ private constructor( } @Throws(ResourceUnavailableException::class, InterruptedException::class) - override fun read(key: String): AckEnvelope? = unsafeSneakyRaises { - withRetries { readImpl(key) } - } + override fun read(key: String): AckEnvelope? = withRetries { readImpl(key) } - context(_: Raise, _: Raise) private fun readImpl(key: String): AckEnvelope? { return database.withConnection { connection -> val row = adapter.selectByKey(connection, pKind, key) ?: return@withConnection null @@ -539,19 +520,13 @@ private constructor( } @Throws(ResourceUnavailableException::class, InterruptedException::class) - override fun dropMessage(key: String): Boolean = unsafeSneakyRaises { - withRetries { - database.withTransaction { connection -> adapter.deleteOneRow(connection, key, pKind) } - } + override fun dropMessage(key: String): Boolean = withRetries { + database.withTransaction { connection -> adapter.deleteOneRow(connection, key, pKind) } } @Throws(ResourceUnavailableException::class, InterruptedException::class) - override fun containsMessage(key: String): Boolean = unsafeSneakyRaises { - withRetries { - database.withConnection { connection -> - adapter.checkIfKeyExists(connection, key, pKind) - } - } + override fun containsMessage(key: String): Boolean = withRetries { + database.withConnection { connection -> adapter.checkIfKeyExists(connection, key, pKind) } } @Throws( @@ -564,12 +539,8 @@ private constructor( "To drop all messages, you must provide the exact confirmation string" } - return unsafeSneakyRaises { - withRetries { - database.withTransaction { connection -> - adapter.dropAllMessages(connection, pKind) - } - } + return withRetries { + database.withTransaction { connection -> adapter.dropAllMessages(connection, pKind) } } } @@ -607,6 +578,8 @@ private constructor( public companion object { private val logger = LoggerFactory.getLogger(DelayedQueueJDBC::class.java) + private fun withRetries(block: () -> T): T = block() + /** * Runs database migrations for the specified configuration. * @@ -619,7 +592,7 @@ private constructor( */ @JvmStatic @Throws(ResourceUnavailableException::class, InterruptedException::class) - public fun runMigrations(config: DelayedQueueJDBCConfig): Unit = unsafeSneakyRaises { + public fun runMigrations(config: DelayedQueueJDBCConfig): Unit = withRetries { val database = Database(config.db) database.use { database.withConnection { connection -> @@ -671,7 +644,7 @@ private constructor( serializer: MessageSerializer, config: DelayedQueueJDBCConfig, clock: Clock = Clock.systemUTC(), - ): DelayedQueueJDBC = unsafeSneakyRaises { + ): DelayedQueueJDBC = withRetries { val database = Database(config.db) val adapter = SQLVendorAdapter.create(config.db.driver, config.tableName) DelayedQueueJDBC( diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/exceptions.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/exceptions.kt index bff549b..912c7b1 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/exceptions.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/exceptions.kt @@ -16,6 +16,8 @@ package org.funfix.delayedqueue.jvm +import java.io.IOException + /** * Checked exception thrown in case of exceptions happening that are not recoverable, rendering * DelayedQueue inaccessible. @@ -23,4 +25,4 @@ package org.funfix.delayedqueue.jvm * Example: issues with the RDBMS (bugs, or connection unavailable, failing after multiple retries) */ public class ResourceUnavailableException(message: String?, cause: Throwable?) : - Exception(message, cause) + IOException(message, cause) diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/CronServiceImpl.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/CronServiceImpl.kt index 0352650..4d95191 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/CronServiceImpl.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/CronServiceImpl.kt @@ -33,9 +33,6 @@ import org.funfix.delayedqueue.jvm.CronPayloadGenerator import org.funfix.delayedqueue.jvm.CronService import org.funfix.delayedqueue.jvm.DelayedQueue import org.funfix.delayedqueue.jvm.ResourceUnavailableException -import org.funfix.delayedqueue.jvm.internals.utils.Raise -import org.funfix.delayedqueue.jvm.internals.utils.runAndRecoverRaised -import org.funfix.delayedqueue.jvm.internals.utils.unsafeSneakyRaises import org.funfix.delayedqueue.jvm.internals.utils.withTimeout import org.slf4j.LoggerFactory @@ -45,9 +42,7 @@ import org.slf4j.LoggerFactory * Used by CronServiceImpl to delegate database operations to the DelayedQueue implementation while * maintaining proper exception flow tracking via Raise context. */ -internal typealias CronDeleteOperation = - context(Raise, Raise) - (CronConfigHash, String) -> Unit +internal typealias CronDeleteOperation = (CronConfigHash, String) -> Unit /** * Base implementation of CronService that can be used by both in-memory and JDBC implementations. @@ -66,19 +61,17 @@ internal class CronServiceImpl( keyPrefix: String, messages: List>, ) { - unsafeSneakyRaises { - installTick0( - configHash = configHash, - keyPrefix = keyPrefix, - messages = messages, - canUpdate = false, - ) - } + installTick0( + configHash = configHash, + keyPrefix = keyPrefix, + messages = messages, + canUpdate = false, + ) } @Throws(ResourceUnavailableException::class, InterruptedException::class) override fun uninstallTick(configHash: CronConfigHash, keyPrefix: String) { - unsafeSneakyRaises { deleteCurrentCron(configHash, keyPrefix) } + deleteCurrentCron(configHash, keyPrefix) } @Throws(ResourceUnavailableException::class, InterruptedException::class) @@ -157,7 +150,6 @@ internal class CronServiceImpl( * @param canUpdate whether to update existing messages (false for installTick, varies for * install) */ - context(_: Raise, _: Raise) private fun installTick0( configHash: CronConfigHash, keyPrefix: String, @@ -208,21 +200,17 @@ internal class CronServiceImpl( val task = Runnable { try { - runAndRecoverRaised({ - withTimeout(scheduleInterval) { - val now = clock.instant() - val firstRun = isFirst.getAndSet(false) - val messages = generateMany(now) - - installTick0( - configHash = configHash, - keyPrefix = keyPrefix, - messages = messages, - canUpdate = firstRun, - ) - } - }) { timeout -> - throw timeout + withTimeout(scheduleInterval) { + val now = clock.instant() + val firstRun = isFirst.getAndSet(false) + val messages = generateMany(now) + + installTick0( + configHash = configHash, + keyPrefix = keyPrefix, + messages = messages, + canUpdate = firstRun, + ) } } catch (e: Exception) { logger.error("Error in cron task for $keyPrefix", e) diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/Migration.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/Migration.kt index 2d040b6..78d3e3c 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/Migration.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/Migration.kt @@ -16,9 +16,6 @@ package org.funfix.delayedqueue.jvm.internals.jdbc -import java.sql.SQLException -import org.funfix.delayedqueue.jvm.internals.utils.Raise - /** * Represents a database migration with SQL and a test to check if it needs to run. * @@ -91,7 +88,6 @@ internal object MigrationRunner { * @param migrations List of migrations to run * @return Number of migrations executed */ - context(_: Raise, _: Raise) fun runMigrations(conn: SafeConnection, migrations: List): Int { var executed = 0 for (migration in migrations) { diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt index 3636368..2386844 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt @@ -18,7 +18,6 @@ package org.funfix.delayedqueue.jvm.internals.jdbc import java.sql.PreparedStatement import java.sql.ResultSet -import java.sql.SQLException import java.sql.Types import java.time.Duration import java.time.Instant @@ -31,7 +30,6 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.mysql.MySQLAdapter import org.funfix.delayedqueue.jvm.internals.jdbc.oracle.OracleAdapter import org.funfix.delayedqueue.jvm.internals.jdbc.postgres.PostgreSQLAdapter import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SqliteAdapter -import org.funfix.delayedqueue.jvm.internals.utils.Raise /** * Describes actual SQL queries executed — can be overridden to provide driver-specific queries. @@ -43,8 +41,16 @@ import org.funfix.delayedqueue.jvm.internals.utils.Raise * @property tableName the name of the delayed queue table */ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val tableName: String) { + /** + * Inserts a single row into the database. Returns true if inserted, false if key already + * exists. + */ + open fun insertOneRow(conn: SafeConnection, row: DBTableRow): Boolean { + val inserted = insertBatch(conn, listOf(row)) + return inserted.isNotEmpty() + } + /** Checks if a key exists in the database. */ - context(_: Raise, _: Raise) fun checkIfKeyExists(conn: SafeConnection, key: String, kind: String): Boolean { val sql = """ @@ -58,17 +64,9 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t } } - /** - * Inserts a single row into the database. Returns true if inserted, false if key already - * exists. - */ - context(_: Raise, _: Raise) - abstract fun insertOneRow(conn: SafeConnection, row: DBTableRow): Boolean - /** * Inserts multiple rows in a batch. Returns the list of keys that were successfully inserted. */ - context(_: Raise, _: Raise) fun insertBatch(conn: SafeConnection, rows: List): List { if (rows.isEmpty()) return emptyList() @@ -130,7 +128,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t * Updates an existing row with optimistic locking (compare-and-swap). Only updates if the * current row matches what's in the database. */ - context(_: Raise, _: Raise) fun guardedUpdate( conn: SafeConnection, currentRow: DBTableRow, @@ -167,7 +164,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t } /** Selects one row by its key. */ - context(_: Raise, _: Raise) open fun selectByKey(conn: SafeConnection, kind: String, key: String): DBTableRowWithId? { val sql = """ @@ -206,7 +202,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t * - MS-SQL: WITH (UPDLOCK) * - HSQLDB: Falls back to plain SELECT (limited row-level locking support) */ - context(_: Raise, _: Raise) abstract fun selectForUpdateOneRow( conn: SafeConnection, kind: String, @@ -219,7 +214,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t * Returns the subset of keys that already exist in the database. This is used by batch * operations to avoid N+1 queries. */ - context(_: Raise, _: Raise) fun searchAvailableKeys(conn: SafeConnection, kind: String, keys: List): Set { if (keys.isEmpty()) return emptySet() @@ -248,7 +242,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t } /** Deletes one row by key and kind. */ - context(_: Raise, _: Raise) fun deleteOneRow(conn: SafeConnection, key: String, kind: String): Boolean { val sql = """ @@ -265,7 +258,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t } /** Deletes rows with a specific lock UUID. */ - context(_: Raise, _: Raise) fun deleteRowsWithLock(conn: SafeConnection, lockUuid: String): Int { val sql = """ @@ -279,7 +271,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t } /** Deletes a row by its fingerprint (id and createdAt). */ - context(_: Raise, _: Raise) fun deleteRowByFingerprint(conn: SafeConnection, row: DBTableRowWithId): Boolean { val sql = """ @@ -298,7 +289,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t } /** Deletes all rows with a specific kind (used for cleanup in tests). */ - context(_: Raise, _: Raise) fun dropAllMessages(onnn: SafeConnection, kind: String): Int { val sql = """ @@ -315,7 +305,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t * Deletes cron messages matching a specific config hash and key prefix. Used by uninstallTick * to remove the current cron configuration. */ - context(_: Raise, _: Raise) fun deleteCurrentCron( conn: SafeConnection, kind: String, @@ -340,7 +329,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t * Deletes ALL cron messages with a given prefix (ignoring config hash). This is used as a * fallback or for complete cleanup of a prefix. */ - context(_: Raise, _: Raise) fun deleteAllForPrefix(conn: SafeConnection, kind: String, keyPrefix: String): Int { val sql = """ @@ -360,7 +348,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t * installTick to remove outdated configurations while preserving the current one. This avoids * wasteful deletions when the configuration hasn't changed. */ - context(_: Raise, _: Raise) fun deleteOldCron( conn: SafeConnection, kind: String, @@ -386,7 +373,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t * Acquires many messages optimistically by updating them with a lock. Returns the number of * messages acquired. */ - context(_: Raise, _: Raise) abstract fun acquireManyOptimistically( conn: SafeConnection, kind: String, @@ -397,7 +383,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t ): Int /** Selects the first available message for processing (with locking if supported). */ - context(_: Raise, _: Raise) abstract fun selectFirstAvailableWithLock( conn: SafeConnection, kind: String, @@ -405,7 +390,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t ): DBTableRowWithId? /** Selects all messages with a specific lock UUID. */ - context(_: Raise, _: Raise) open fun selectAllAvailableWithLock( conn: SafeConnection, lockUuid: String, @@ -448,7 +432,6 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t * successfully acquired. */ /** Acquires a row by updating its scheduledAt and lockUuid. */ - context(_: Raise, _: Raise) fun acquireRowByUpdate( conn: SafeConnection, row: DBTableRow, diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/dbRetries.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/dbRetries.kt index a1f1c79..06732df 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/dbRetries.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/dbRetries.kt @@ -19,9 +19,7 @@ package org.funfix.delayedqueue.jvm.internals.jdbc import java.sql.SQLException import org.funfix.delayedqueue.jvm.ResourceUnavailableException import org.funfix.delayedqueue.jvm.RetryConfig -import org.funfix.delayedqueue.jvm.internals.utils.Raise import org.funfix.delayedqueue.jvm.internals.utils.RetryOutcome -import org.funfix.delayedqueue.jvm.internals.utils.raise import org.funfix.delayedqueue.jvm.internals.utils.withRetries /** @@ -41,14 +39,11 @@ import org.funfix.delayedqueue.jvm.internals.utils.withRetries * @throws ResourceUnavailableException if retries are exhausted or timeout occurs * @throws InterruptedException if the operation is interrupted */ -context(_: Raise, _: Raise) internal fun withDbRetries( config: RetryConfig, clock: java.time.Clock, filters: RdbmsExceptionFilters, - block: - context(Raise, Raise) - () -> T, + block: () -> T, ): T = try { withRetries( @@ -70,8 +65,8 @@ internal fun withDbRetries( } } }, - block = { block(Raise._PRIVATE_AND_UNSAFE, Raise._PRIVATE_AND_UNSAFE) }, + block = { block() }, ) } catch (e: java.util.concurrent.TimeoutException) { - raise(ResourceUnavailableException("Database operation timed out after retries", e)) + throw ResourceUnavailableException("Database operation timed out after retries", e) } diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/h2/H2Adapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/h2/H2Adapter.kt index 53a89e1..8b72c86 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/h2/H2Adapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/h2/H2Adapter.kt @@ -16,7 +16,6 @@ package org.funfix.delayedqueue.jvm.internals.jdbc.h2 -import java.sql.SQLException import java.time.Duration import java.time.Instant import org.funfix.delayedqueue.jvm.JdbcDriver @@ -26,13 +25,11 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter import org.funfix.delayedqueue.jvm.internals.jdbc.SafeConnection import org.funfix.delayedqueue.jvm.internals.jdbc.prepareStatement import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId -import org.funfix.delayedqueue.jvm.internals.utils.Raise /** H2-specific adapter. */ internal class H2Adapter(driver: JdbcDriver, tableName: String) : SQLVendorAdapter(driver, tableName) { - context(_: Raise, _: Raise) override fun selectForUpdateOneRow( conn: SafeConnection, kind: String, @@ -68,7 +65,6 @@ internal class H2Adapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun insertOneRow(conn: SafeConnection, row: DBTableRow): Boolean { val sql = """ @@ -94,7 +90,6 @@ internal class H2Adapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectFirstAvailableWithLock( conn: SafeConnection, kind: String, @@ -131,7 +126,6 @@ internal class H2Adapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun acquireManyOptimistically( conn: SafeConnection, kind: String, diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBAdapter.kt index 307d73d..d336dda 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBAdapter.kt @@ -16,7 +16,6 @@ package org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb -import java.sql.SQLException import java.time.Duration import java.time.Instant import org.funfix.delayedqueue.jvm.JdbcDriver @@ -26,13 +25,11 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter import org.funfix.delayedqueue.jvm.internals.jdbc.SafeConnection import org.funfix.delayedqueue.jvm.internals.jdbc.prepareStatement import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId -import org.funfix.delayedqueue.jvm.internals.utils.Raise /** HSQLDB-specific adapter. */ internal class HSQLDBAdapter(driver: JdbcDriver, tableName: String) : SQLVendorAdapter(driver, tableName) { - context(_: Raise, _: Raise) override fun selectForUpdateOneRow( conn: SafeConnection, kind: String, @@ -43,7 +40,6 @@ internal class HSQLDBAdapter(driver: JdbcDriver, tableName: String) : return selectByKey(conn, kind, key) } - context(_: Raise, _: Raise) override fun insertOneRow(conn: SafeConnection, row: DBTableRow): Boolean { // NOTE: it's fine if this INSERT fails with a duplicate key error, // since the call-site is supposed to handle it by catching the SQLException @@ -71,7 +67,6 @@ internal class HSQLDBAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectFirstAvailableWithLock( conn: SafeConnection, kind: String, @@ -107,7 +102,6 @@ internal class HSQLDBAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun acquireManyOptimistically( conn: SafeConnection, kind: String, diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/jdbc.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/jdbc.kt index b94fbf8..a594301 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/jdbc.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/jdbc.kt @@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException import javax.sql.DataSource import org.funfix.delayedqueue.jvm.JdbcConnectionConfig import org.funfix.delayedqueue.jvm.JdbcDriver -import org.funfix.delayedqueue.jvm.internals.utils.Raise import org.funfix.delayedqueue.jvm.internals.utils.runBlockingIO import org.funfix.tasks.jvm.Task import org.funfix.tasks.jvm.TaskCancellationException @@ -55,10 +54,8 @@ internal class Database( internal data class SafeConnection(val underlying: Connection, val driver: JdbcDriver) -context(_: Raise) internal inline fun runSQLOperation(block: () -> T): T = block() -context(_: Raise, _: Raise) internal fun Database.withConnection(block: (SafeConnection) -> T): T = runBlockingIO { runSQLOperation { source.connection.let { @@ -75,7 +72,6 @@ internal fun Database.withConnection(block: (SafeConnection) -> T): T = runB } } -context(_: Raise, _: Raise) internal fun Database.withTransaction(block: (SafeConnection) -> T) = withConnection { connection -> val autoCommit = connection.underlying.autoCommit @@ -96,25 +92,20 @@ internal fun Database.withTransaction(block: (SafeConnection) -> T) = } } -context(_: Raise, _: Raise) internal fun SafeConnection.execute(sql: String): Boolean = withStatement({ it.createStatement() }) { statement -> statement.execute(sql) } -context(_: Raise, _: Raise) internal fun SafeConnection.createStatement(block: (Statement) -> T): T = withStatement({ it.createStatement() }, block) -context(_: Raise, _: Raise) internal fun SafeConnection.prepareStatement(sql: String, block: (PreparedStatement) -> T): T = withStatement({ it.prepareStatement(sql.trimIndent()) }, block) -context(_: Raise, _: Raise) internal fun SafeConnection.query(sql: String, block: (ResultSet) -> T): T = withStatement({ it.prepareStatement(sql) }) { statement -> statement.executeQuery().use { resultSet -> block(resultSet) } } -context(_: Raise, _: Raise) internal fun SafeConnection.withStatement( createStatement: (Connection) -> Stm, block: (Stm) -> T, diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MySQLCompatibleAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MySQLCompatibleAdapter.kt index ed4681c..463e4be 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MySQLCompatibleAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MySQLCompatibleAdapter.kt @@ -1,11 +1,9 @@ package org.funfix.delayedqueue.jvm.internals.jdbc.mariadb -import java.sql.SQLException import java.time.Duration import java.time.Instant import org.funfix.delayedqueue.jvm.JdbcDriver import org.funfix.delayedqueue.jvm.internals.jdbc.* -import org.funfix.delayedqueue.jvm.internals.utils.Raise /** * Adapter for MySQL-compatible databases (MySQL and MariaDB). @@ -16,7 +14,6 @@ import org.funfix.delayedqueue.jvm.internals.utils.Raise internal open class MySQLCompatibleAdapter(driver: JdbcDriver, tableName: String) : SQLVendorAdapter(driver, tableName) { - context(_: Raise, _: Raise) override fun insertOneRow(conn: SafeConnection, row: DBTableRow): Boolean { val sql = """ @@ -43,7 +40,6 @@ internal open class MySQLCompatibleAdapter(driver: JdbcDriver, tableName: String } } - context(_: Raise, _: Raise) override fun selectForUpdateOneRow( conn: SafeConnection, kind: String, @@ -79,7 +75,6 @@ internal open class MySQLCompatibleAdapter(driver: JdbcDriver, tableName: String } } - context(_: Raise, _: Raise) override fun selectFirstAvailableWithLock( conn: SafeConnection, kind: String, @@ -116,7 +111,6 @@ internal open class MySQLCompatibleAdapter(driver: JdbcDriver, tableName: String } } - context(_: Raise, _: Raise) override fun acquireManyOptimistically( conn: SafeConnection, kind: String, diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerAdapter.kt index df1e190..89b36a8 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerAdapter.kt @@ -16,7 +16,6 @@ package org.funfix.delayedqueue.jvm.internals.jdbc.mssql -import java.sql.SQLException import java.time.Duration import java.time.Instant import org.funfix.delayedqueue.jvm.JdbcDriver @@ -26,13 +25,11 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter import org.funfix.delayedqueue.jvm.internals.jdbc.SafeConnection import org.funfix.delayedqueue.jvm.internals.jdbc.prepareStatement import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId -import org.funfix.delayedqueue.jvm.internals.utils.Raise /** MS-SQL-specific adapter. */ internal class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) : SQLVendorAdapter(driver, tableName) { - context(_: Raise, _: Raise) override fun insertOneRow(conn: SafeConnection, row: DBTableRow): Boolean { // NOTE: this query can still throw an SQLException, because the // IF NOT EXISTS check is not atomic. But this is still fine, as we @@ -71,7 +68,6 @@ internal class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectForUpdateOneRow( conn: SafeConnection, kind: String, @@ -107,7 +103,6 @@ internal class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectFirstAvailableWithLock( conn: SafeConnection, kind: String, @@ -144,7 +139,6 @@ internal class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun acquireManyOptimistically( conn: SafeConnection, kind: String, @@ -182,7 +176,6 @@ internal class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectByKey(conn: SafeConnection, kind: String, key: String): DBTableRowWithId? { val sql = """ @@ -212,7 +205,6 @@ internal class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectAllAvailableWithLock( conn: SafeConnection, lockUuid: String, diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/oracle/OracleAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/oracle/OracleAdapter.kt index 23b7c2c..2f4bad9 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/oracle/OracleAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/oracle/OracleAdapter.kt @@ -16,7 +16,6 @@ package org.funfix.delayedqueue.jvm.internals.jdbc.oracle -import java.sql.SQLException import java.time.Duration import java.time.Instant import org.funfix.delayedqueue.jvm.JdbcDriver @@ -26,13 +25,11 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter import org.funfix.delayedqueue.jvm.internals.jdbc.SafeConnection import org.funfix.delayedqueue.jvm.internals.jdbc.prepareStatement import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId -import org.funfix.delayedqueue.jvm.internals.utils.Raise /** Oracle-specific adapter. */ internal class OracleAdapter(driver: JdbcDriver, tableName: String) : SQLVendorAdapter(driver, tableName) { - context(_: Raise, _: Raise) override fun insertOneRow(conn: SafeConnection, row: DBTableRow): Boolean { // NOTE: this query can still throw an SQLException under concurrency, // because the NOT EXISTS check is not atomic. But this is still fine, @@ -70,7 +67,6 @@ internal class OracleAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectForUpdateOneRow( conn: SafeConnection, kind: String, @@ -105,7 +101,6 @@ internal class OracleAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectByKey(conn: SafeConnection, kind: String, key: String): DBTableRowWithId? { val sql = """ @@ -135,7 +130,6 @@ internal class OracleAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectFirstAvailableWithLock( conn: SafeConnection, kind: String, @@ -176,7 +170,6 @@ internal class OracleAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun acquireManyOptimistically( conn: SafeConnection, kind: String, @@ -235,7 +228,6 @@ internal class OracleAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectAllAvailableWithLock( conn: SafeConnection, lockUuid: String, diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/postgres/PostgreSQLAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/postgres/PostgreSQLAdapter.kt index 5a2f2db..e85dd18 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/postgres/PostgreSQLAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/postgres/PostgreSQLAdapter.kt @@ -16,7 +16,6 @@ package org.funfix.delayedqueue.jvm.internals.jdbc.postgres -import java.sql.SQLException import java.time.Duration import java.time.Instant import org.funfix.delayedqueue.jvm.JdbcDriver @@ -26,13 +25,11 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter import org.funfix.delayedqueue.jvm.internals.jdbc.SafeConnection import org.funfix.delayedqueue.jvm.internals.jdbc.prepareStatement import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId -import org.funfix.delayedqueue.jvm.internals.utils.Raise /** PostgreSQL-specific adapter. */ internal class PostgreSQLAdapter(driver: JdbcDriver, tableName: String) : SQLVendorAdapter(driver, tableName) { - context(_: Raise, _: Raise) override fun insertOneRow(conn: SafeConnection, row: DBTableRow): Boolean { val sql = """ @@ -60,7 +57,6 @@ internal class PostgreSQLAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectForUpdateOneRow( conn: SafeConnection, kind: String, @@ -96,7 +92,6 @@ internal class PostgreSQLAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectFirstAvailableWithLock( conn: SafeConnection, kind: String, @@ -133,7 +128,6 @@ internal class PostgreSQLAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun acquireManyOptimistically( conn: SafeConnection, kind: String, diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SqliteAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SqliteAdapter.kt index 8a7b89c..2a084cb 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SqliteAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SqliteAdapter.kt @@ -16,7 +16,6 @@ package org.funfix.delayedqueue.jvm.internals.jdbc.sqlite -import java.sql.SQLException import java.time.Duration import java.time.Instant import org.funfix.delayedqueue.jvm.JdbcDriver @@ -26,7 +25,6 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter import org.funfix.delayedqueue.jvm.internals.jdbc.SafeConnection import org.funfix.delayedqueue.jvm.internals.jdbc.prepareStatement import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId -import org.funfix.delayedqueue.jvm.internals.utils.Raise /** * SQLite-specific adapter. @@ -40,7 +38,6 @@ import org.funfix.delayedqueue.jvm.internals.utils.Raise internal class SqliteAdapter(driver: JdbcDriver, tableName: String) : SQLVendorAdapter(driver, tableName) { - context(_: Raise, _: Raise) override fun selectForUpdateOneRow( conn: SafeConnection, kind: String, @@ -50,7 +47,6 @@ internal class SqliteAdapter(driver: JdbcDriver, tableName: String) : return selectByKey(conn, kind, key) } - context(_: Raise, _: Raise) override fun insertOneRow(conn: SafeConnection, row: DBTableRow): Boolean { // INSERT OR IGNORE is the idiomatic SQLite way to skip duplicate key inserts. val sql = @@ -78,7 +74,6 @@ internal class SqliteAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun selectFirstAvailableWithLock( conn: SafeConnection, kind: String, @@ -114,7 +109,6 @@ internal class SqliteAdapter(driver: JdbcDriver, tableName: String) : } } - context(_: Raise, _: Raise) override fun acquireManyOptimistically( conn: SafeConnection, kind: String, diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/utils/execution.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/utils/execution.kt index 2e450fb..cca6faf 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/utils/execution.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/utils/execution.kt @@ -22,7 +22,6 @@ import org.funfix.tasks.jvm.TaskExecutors internal val DB_EXECUTOR by lazy { TaskExecutors.sharedBlockingIO() } -context(_: Raise) internal fun runBlockingIO(block: () -> T): T { val fiber = Task.fromBlockingIO { block() }.ensureRunningOnExecutor(DB_EXECUTOR).runFiber() try { @@ -36,7 +35,6 @@ internal fun runBlockingIO(block: () -> T): T { } } -context(_: Raise) internal fun runBlockingIOUninterruptible(block: () -> T): T { val fiber = Task.fromBlockingIO { block() }.ensureRunningOnExecutor(DB_EXECUTOR).runFiber() diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/utils/raise.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/utils/raise.kt deleted file mode 100644 index 9c881b5..0000000 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/utils/raise.kt +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright 2026 Alexandru Nedelcu - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.funfix.delayedqueue.jvm.internals.utils - -/** - * A context parameter type that enables compile-time tracking of checked exceptions. - * - * ## Purpose - * - * The `Raise` context parameter allows functions to declare what checked exceptions they can throw - * in a way that the Kotlin type system can track. This is superior to traditional exception - * handling because: - * 1. **Compile-time safety**: The compiler ensures all exception paths are handled - * 2. **Explicit exception flow**: The type system documents exception propagation - * 3. **Java interop**: Maps cleanly to `@Throws` declarations for Java consumers - * - * ## Usage Pattern - * - * Functions that can raise exceptions declare a `context(Raise)` parameter: - * ```kotlin - * context(_: Raise) - * fun queryDatabase(): ResultSet { - * // Can throw SQLException - * return connection.executeQuery(sql) - * } - * - * context(_: Raise, _: Raise) - * fun complexOperation() { - * // Can throw both SQLException and InterruptedException - * queryDatabase() // Automatically gets the Raise context - * } - * ``` - * - * ## Architecture in DelayedQueue - * - * The library uses a layered exception handling approach: - * 1. **Internal methods** declare fine-grained `context(Raise, - * Raise)` - * - These are the methods that directly call `database.withConnection/withTransaction` - * - The type system tracks that SQLException can be raised - * 2. **Retry wrapper** (`withRetries`/`withDbRetries`) has - * `context(Raise, Raise)` - * - Catches SQLException and TimeoutException - * - Wraps them into ResourceUnavailableException after retries are exhausted - * - Type system knows it raises ResourceUnavailableException, not SQLException - * 3. **Public API methods** use `unsafeSneakyRaises` ONLY at the boundary - * - Declared with `@Throws(ResourceUnavailableException::class, InterruptedException::class)` - * - Call `unsafeSneakyRaises { withRetries { internalMethod() } }` - * - This suppresses the Raise context into the @Throws annotation for Java - * - * ## Contract - * - **NEVER use `unsafeSneakyRaises` in internal implementations** - * - It defeats the purpose of Raise by hiding exception flow from the type system - * - Only use at public API boundaries where `@Throws` declarations exist - * - Only use in tests where exception tracking is not needed - * - * @param E The exception type that can be raised - */ -@JvmInline -internal value class Raise private constructor(val fake: Nothing? = null) { - companion object { - val _PRIVATE_AND_UNSAFE: Raise = Raise() - } -} - -/** - * Raises an exception within a Raise context. - * - * This function can only be called when a `Raise` context is available, ensuring compile-time - * tracking of exception types. - * - * @param exception The exception to raise - * @return Never returns (always throws) - * @throws E Always throws the provided exception - */ -context(_: Raise) -internal inline fun raise(exception: E): Nothing = throw exception - -/** - * **DANGER: Only use at public API boundaries or in tests!** - * - * Provides a `Raise` context to a block, bypassing compile-time exception tracking. - * - * ## When to use - * 1. **Public API methods with @Throws declarations** ```kotlin - * - * @param block The code to execute with an unsafe Raise context - * @return The result of executing the block - * @Throws(ResourceUnavailableException::class, InterruptedException::class) override fun - * poll(): AckEnvelope = unsafeSneakyRaises { withRetries { internalPoll() } } ``` The - * `@Throws` annotation serves as the Java contract, and `unsafeSneakyRaises` suppresses the - * Raise context at the boundary. - * 2. **Tests where exception tracking is not needed** - * - * ## When NOT to use - * - **NEVER in internal implementations** - defeats the purpose of Raise - * - **NEVER when you can use proper Raise context** - always prefer explicit context - * - **NEVER to hide exception handling** - the type system should track exceptions - * - * ## Why it exists - * - * Kotlin's context receivers are not yet visible to Java, so we need a way to bridge between - * Kotlin's Raise context and Java's `@Throws` declarations at the public API. - */ -internal inline fun unsafeSneakyRaises( - block: - context(Raise) - () -> T -): T = block(Raise._PRIVATE_AND_UNSAFE) - -/** How to safely handle exceptions marked via the Raise context. */ -internal inline fun runAndRecoverRaised( - block: - context(Raise) - () -> T, - catch: (E) -> T, -): T = - try { - block(Raise._PRIVATE_AND_UNSAFE) - } catch (e: Exception) { - catch(e as E) - } diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/utils/retry.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/utils/retry.kt index 4440198..0b818ec 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/utils/retry.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/utils/retry.kt @@ -102,7 +102,6 @@ internal enum class RetryOutcome { RAISE, } -context(_: Raise, _: Raise) internal fun withRetries( config: RetryConfig, clock: Clock, @@ -114,9 +113,7 @@ internal fun withRetries( while (true) { try { return if (config.perTryHardTimeout != null) { - // Acceptable use of unsafeSneakyRaises, as it's being - // caught below and wrapped into ResourceUnavailableException - unsafeSneakyRaises { withTimeout(config.perTryHardTimeout) { block() } } + withTimeout(config.perTryHardTimeout) { block() } } else { block() } @@ -164,7 +161,6 @@ private fun createFinalException(state: Evolution, e: Exception, now: Instant): } } -context(_: Raise, _: Raise) internal fun withTimeout(timeout: Duration, block: () -> T): T { val task = org.funfix.tasks.jvm.Task.fromBlockingIO { block() } val fiber = task.ensureRunningOnExecutor(DB_EXECUTOR).runFiber() @@ -174,7 +170,7 @@ internal fun withTimeout(timeout: Duration, block: () -> T): T { } catch (e: TimeoutException) { fiber.cancel() fiber.joinBlockingUninterruptible() - raise(e) + throw e } catch (e: ExecutionException) { val cause = e.cause when { @@ -184,6 +180,6 @@ internal fun withTimeout(timeout: Duration, block: () -> T): T { } catch (e: InterruptedException) { fiber.cancel() fiber.joinBlockingUninterruptible() - raise(e) + throw e } } diff --git a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/DatabaseTests.kt b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/DatabaseTests.kt index ba5b8ff..4055500 100644 --- a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/DatabaseTests.kt +++ b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/DatabaseTests.kt @@ -50,14 +50,14 @@ class DatabaseTests { } @Test - fun `buildHikariConfig sets correct values`() = sneakyRunDB { + fun `buildHikariConfig sets correct values`() { val hikariConfig = ConnectionPool.buildHikariConfig(config) assertEquals(config.url, hikariConfig.jdbcUrl) assertEquals(config.driver.className, hikariConfig.driverClassName) } @Test - fun `createDataSource returns working DataSource`() = sneakyRunDB { + fun `createDataSource returns working DataSource`() { dataSource.connection.use { conn -> assertFalse(conn.isClosed) assertTrue(conn.metaData.driverName.contains("SQLite", ignoreCase = true)) @@ -65,7 +65,7 @@ class DatabaseTests { } @Test - fun `Database withConnection executes block and closes connection`() = sneakyRunDB { + fun `Database withConnection executes block and closes connection`() { var connectionClosedAfter: Boolean var connectionRef: SafeConnection? = null val result = @@ -81,7 +81,7 @@ class DatabaseTests { } @Test - fun `Database withTransaction commits on success`() = sneakyRunDB { + fun `Database withTransaction commits on success`() { database.withConnection { safeConn -> safeConn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)") } @@ -99,18 +99,16 @@ class DatabaseTests { } @Test - fun `Database withTransaction rolls back on exception`() = sneakyRunDB { + fun `Database withTransaction rolls back on exception`() { database.withConnection { safeConn -> safeConn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)") } assertThrows(SQLException::class.java) { - sneakyRunDB { - database.withTransaction { safeConn -> - safeConn.execute("INSERT INTO test (name) VALUES ('foo')") - // This will fail (duplicate primary key) - safeConn.execute("INSERT INTO test (id, name) VALUES (1, 'bar')") - safeConn.execute("INSERT INTO test (id, name) VALUES (1, 'baz')") - } + database.withTransaction { safeConn -> + safeConn.execute("INSERT INTO test (name) VALUES ('foo')") + // This will fail (duplicate primary key) + safeConn.execute("INSERT INTO test (id, name) VALUES (1, 'bar')") + safeConn.execute("INSERT INTO test (id, name) VALUES (1, 'baz')") } } val count = @@ -124,7 +122,7 @@ class DatabaseTests { } @Test - fun `Statement query executes block and returns result`() = sneakyRunDB { + fun `Statement query executes block and returns result`() { database.withConnection { safeConn -> safeConn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)") safeConn.execute("INSERT INTO test (name) VALUES ('foo')") diff --git a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/ExecutionTests.kt b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/ExecutionTests.kt index 0ad0c6a..446bd54 100644 --- a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/ExecutionTests.kt +++ b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/ExecutionTests.kt @@ -24,13 +24,13 @@ import org.opentest4j.AssertionFailedError class ExecutionTests { @Test - fun `runBlockingIO returns result`() = unsafeSneakyRaises { + fun `runBlockingIO returns result`() { val result = runBlockingIO { 42 } assertEquals(42, result) } @Test - fun `runBlockingIO propagates ExecutionException`() = unsafeSneakyRaises { + fun `runBlockingIO propagates ExecutionException`() { val ex = ExecutionException("fail", null) val thrown = assertThrows(ExecutionException::class.java) { runBlockingIO { throw ex } } assertEquals(ex, thrown) @@ -39,13 +39,11 @@ class ExecutionTests { @Test fun `runBlockingIO propagates InterruptedException as TaskCancellationException`() { val interrupted = InterruptedException("interrupted") - assertThrows(TaskCancellationException::class.java) { - unsafeSneakyRaises { runBlockingIO { throw interrupted } } - } + assertThrows(TaskCancellationException::class.java) { runBlockingIO { throw interrupted } } } @Test - fun `runBlockingIO runs on shared executor`() = unsafeSneakyRaises { + fun `runBlockingIO runs on shared executor`() { val threadName = runBlockingIO { Thread.currentThread().name } assertTrue(threadName.contains("virtual")) } @@ -53,18 +51,18 @@ class ExecutionTests { @Test fun `runBlockingIO hangs when block throws AssertionFailedError`() { assertThrows(AssertionFailedError::class.java) { - unsafeSneakyRaises { runBlockingIO { throw AssertionFailedError("boom") } } + runBlockingIO { throw AssertionFailedError("boom") } } } @Test - fun `runBlockingIOUninterruptible returns result`() = unsafeSneakyRaises { + fun `runBlockingIOUninterruptible returns result`() { val result = runBlockingIOUninterruptible { 99 } assertEquals(99, result) } @Test - fun `runBlockingIOUninterruptible propagates ExecutionException`() = unsafeSneakyRaises { + fun `runBlockingIOUninterruptible propagates ExecutionException`() { val ex = ExecutionException("fail", null) val thrown = assertThrows(ExecutionException::class.java) { @@ -74,13 +72,11 @@ class ExecutionTests { } @Test - fun `runBlockingIOUninterruptible propagates InterruptedException as TaskCancellationException`() = - unsafeSneakyRaises { - val interrupted = InterruptedException("interrupted") - // Should not throw InterruptedException, but wrap it - assertThrows(TaskCancellationException::class.java) { - runBlockingIOUninterruptible { throw interrupted } - } - Unit + fun `runBlockingIOUninterruptible propagates InterruptedException as TaskCancellationException`() { + val interrupted = InterruptedException("interrupted") + // Should not throw InterruptedException, but wrap it + assertThrows(TaskCancellationException::class.java) { + runBlockingIOUninterruptible { throw interrupted } } + } } diff --git a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/RaiseTests.kt b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/RaiseTests.kt deleted file mode 100644 index 28480cd..0000000 --- a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/RaiseTests.kt +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2026 Alexandru Nedelcu - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.funfix.delayedqueue.jvm.internals.utils - -import java.io.IOException -import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test - -class RaiseTests { - @Test - fun `sneakyRaises provides context receiver`() { - val result = unsafeSneakyRaises { 123 } - assertEquals(123, result) - } - - @Test - fun `raise throws exception in context`() { - val thrown = - assertThrows(IOException::class.java) { - unsafeSneakyRaises { raise(IOException("fail")) } - } - assertEquals("fail", thrown.message) - } - - @Test - fun `sneakyRaises block can catch exception`() { - val result = - try { - unsafeSneakyRaises { raise(IllegalArgumentException("bad")) } - @Suppress("KotlinUnreachableCode") "no error" - } catch (e: IllegalArgumentException) { - e.message - } - assertEquals("bad", result) - } - - @Test - fun `Raise value class is internal and cannot be constructed externally`() { - // This test is just to ensure the API is not public - // Compilation will fail if you try: val r = Raise() - assertNotNull(Raise._PRIVATE_AND_UNSAFE) - } -} diff --git a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/RetryTests.kt b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/RetryTests.kt index df3017f..00b2ebe 100644 --- a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/RetryTests.kt +++ b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/RetryTests.kt @@ -180,16 +180,14 @@ class RetryTests { backoffFactor = 2.0, ) - unsafeSneakyRaises { - val result = - withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RETRY }) { - counter.incrementAndGet() - "success" - } + val result = + withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RETRY }) { + counter.incrementAndGet() + "success" + } - assertEquals("success", result) - assertEquals(1, counter.get()) - } + assertEquals("success", result) + assertEquals(1, counter.get()) } @Test @@ -205,19 +203,17 @@ class RetryTests { backoffFactor = 2.0, ) - unsafeSneakyRaises { - val result = - withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RETRY }) { - val count = counter.incrementAndGet() - if (count < 3) { - throw RuntimeException("transient failure") - } - "success" + val result = + withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RETRY }) { + val count = counter.incrementAndGet() + if (count < 3) { + throw RuntimeException("transient failure") } + "success" + } - assertEquals("success", result) - assertEquals(3, counter.get()) - } + assertEquals("success", result) + assertEquals(3, counter.get()) } @Test @@ -233,20 +229,18 @@ class RetryTests { backoffFactor = 2.0, ) - unsafeSneakyRaises { - val exception = - assertThrows(ResourceUnavailableException::class.java) { - withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RAISE }) { - counter.incrementAndGet() - throw RuntimeException("permanent failure") - } + val exception = + assertThrows(ResourceUnavailableException::class.java) { + withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RAISE }) { + counter.incrementAndGet() + throw RuntimeException("permanent failure") } + } - assertEquals(1, counter.get()) - assertTrue(exception.message!!.contains("Giving up after 0 retries")) - assertInstanceOf(RuntimeException::class.java, exception.cause) - assertEquals("permanent failure", exception.cause?.message) - } + assertEquals(1, counter.get()) + assertTrue(exception.message!!.contains("Giving up after 0 retries")) + assertInstanceOf(RuntimeException::class.java, exception.cause) + assertEquals("permanent failure", exception.cause?.message) } @Test @@ -262,20 +256,18 @@ class RetryTests { backoffFactor = 2.0, ) - unsafeSneakyRaises { - val exception = - assertThrows(ResourceUnavailableException::class.java) { - withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RETRY }) { - val attempt = counter.incrementAndGet() - throw RuntimeException("attempt $attempt failed") - } + val exception = + assertThrows(ResourceUnavailableException::class.java) { + withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RETRY }) { + val attempt = counter.incrementAndGet() + throw RuntimeException("attempt $attempt failed") } + } - assertEquals(4, counter.get()) // initial + 3 retries - assertTrue(exception.message!!.contains("Giving up after 3 retries")) - assertInstanceOf(RuntimeException::class.java, exception.cause) - assertEquals(3, exception.cause?.suppressed?.size) - } + assertEquals(4, counter.get()) // initial + 3 retries + assertTrue(exception.message!!.contains("Giving up after 3 retries")) + assertInstanceOf(RuntimeException::class.java, exception.cause) + assertEquals(3, exception.cause?.suppressed?.size) } @Test @@ -292,29 +284,27 @@ class RetryTests { backoffFactor = 2.0, ) - unsafeSneakyRaises { - assertThrows(ResourceUnavailableException::class.java) { - withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RETRY }) { - timestamps.add(System.currentTimeMillis()) - counter.incrementAndGet() - throw RuntimeException("always fails") - } + assertThrows(ResourceUnavailableException::class.java) { + withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RETRY }) { + timestamps.add(System.currentTimeMillis()) + counter.incrementAndGet() + throw RuntimeException("always fails") } + } - assertEquals(4, timestamps.size) - val delay1 = timestamps[1] - timestamps[0] - val delay2 = timestamps[2] - timestamps[1] - val delay3 = timestamps[3] - timestamps[2] + assertEquals(4, timestamps.size) + val delay1 = timestamps[1] - timestamps[0] + val delay2 = timestamps[2] - timestamps[1] + val delay3 = timestamps[3] - timestamps[2] - assertTrue(delay1 >= 40L) // ~50ms with some tolerance - assertTrue(delay1 < 150L) + assertTrue(delay1 >= 40L) // ~50ms with some tolerance + assertTrue(delay1 < 150L) - assertTrue(delay2 >= 90L) // ~100ms - assertTrue(delay2 < 250L) + assertTrue(delay2 >= 90L) // ~100ms + assertTrue(delay2 < 250L) - assertTrue(delay3 >= 190L) // ~200ms (capped) - assertTrue(delay3 < 350L) - } + assertTrue(delay3 >= 190L) // ~200ms (capped) + assertTrue(delay3 < 350L) } @Test @@ -330,18 +320,16 @@ class RetryTests { backoffFactor = 2.0, ) - unsafeSneakyRaises { - val exception = - assertThrows(java.util.concurrent.TimeoutException::class.java) { - withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RETRY }) { - counter.incrementAndGet() - Thread.sleep(500) - } + val exception = + assertThrows(java.util.concurrent.TimeoutException::class.java) { + withRetries(config, java.time.Clock.systemUTC(), { RetryOutcome.RETRY }) { + counter.incrementAndGet() + Thread.sleep(500) } + } - assertTrue(counter.get() >= 1) - assertTrue(exception.message!!.contains("Giving up")) - } + assertTrue(counter.get() >= 1) + assertTrue(exception.message!!.contains("Giving up")) } } } diff --git a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/TestHelpers.kt b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/TestHelpers.kt index 3baf8f5..3c880be 100644 --- a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/TestHelpers.kt +++ b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/TestHelpers.kt @@ -16,15 +16,8 @@ package org.funfix.delayedqueue.jvm.internals.utils -import java.sql.SQLException - /** * Test helper to call database APIs that declare context receivers for checked exceptions. It * supplies unsafe Raise contexts for InterruptedException and SQLException so tests can call * internal methods without changing production code. */ -internal fun sneakyRunDB( - block: - context(Raise, Raise) - () -> T -): T = block(Raise._PRIVATE_AND_UNSAFE, Raise._PRIVATE_AND_UNSAFE)