From e26757d575ed52cef6e4f7c5d3520f9d121ed8d7 Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Thu, 18 Dec 2025 11:58:10 +0100 Subject: [PATCH 1/3] [#2926] Minor refactoring in MutinySessionImpl * Rename variable "rollback" to "markedForRollback" * Change formattation --- .../reactive/mutiny/impl/MutinySessionImpl.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java index 1f74de84a..939e3d2aa 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java @@ -488,7 +488,7 @@ public Mutiny.Transaction currentTransaction() { } private class Transaction implements Mutiny.Transaction { - boolean rollback; + boolean markedForRollback; Uni execute(Function> work) { currentTransaction = this; @@ -504,13 +504,16 @@ Uni executeInTransaction(Function> work) { return Uni.createFrom() .deferred( () -> work.apply( this ) ) // only flush() if the work completed with no exception - .call( this::flush ).call( this::beforeCompletion ) + .call( this::flush ) + .call( this::beforeCompletion ) // in the case of an exception or cancellation // we need to roll back the transaction - .onFailure().call( this::rollback ).onCancellation().call( this::rollback ) + .onFailure().call( this::rollback ) + .onCancellation().call( this::rollback ) // finally, when there was no exception, // commit or rollback the transaction - .call( () -> rollback ? rollback() : commit() ).call( this::afterCompletion ); + .call( () -> markedForRollback ? rollback() : commit() ) + .call( this::afterCompletion ); } Uni flush() { @@ -534,17 +537,17 @@ private Uni beforeCompletion() { } private Uni afterCompletion() { - return Uni.createFrom().completionStage( delegate.getReactiveActionQueue().afterTransactionCompletion( !rollback ) ); + return Uni.createFrom().completionStage( delegate.getReactiveActionQueue().afterTransactionCompletion( !markedForRollback ) ); } @Override public void markForRollback() { - rollback = true; + markedForRollback = true; } @Override public boolean isMarkedForRollback() { - return rollback; + return markedForRollback; } } From eb2225d1bd1d7d13f65500b9382ac4e4502e3536 Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Thu, 18 Dec 2025 10:21:03 +0100 Subject: [PATCH 2/3] [#2926] Roll back open transactions on close There should not be any open transaction in progress when closing a connection. The risk is to leave open connections pending and exhaust the pool. Now, if the close operation detects an open transaction, it will roll back it and throw a validation error before closing the connection. --- .../hibernate/reactive/logging/impl/Log.java | 6 ++ .../pool/impl/SqlClientConnection.java | 101 ++++++++++++------ 2 files changed, 76 insertions(+), 31 deletions(-) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java index b83d2089f..466562a68 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java @@ -282,6 +282,12 @@ public interface Log extends BasicLogger { @Message(id = 89, value = "Connection is closed") IllegalStateException connectionIsClosed(); + @Message(id = 90, value = "Live transaction detected while closing the connection: it will be roll backed") + IllegalStateException liveTransactionDetectedOnClose(); + + @Message(id = 91, value = "Can't begin a new transaction as an active transaction is already associated to this connection") + IllegalStateException liveTransactionDetectedOnBeginTransaction(); + // Same method that exists in CoreMessageLogger @LogMessage(level = WARN) @Message(id = 104, value = "firstResult/maxResults specified with collection fetch; applying in memory!" ) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java index 413d2a50b..6b6e9b113 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java @@ -36,11 +36,12 @@ import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.SqlConnection; import io.vertx.sqlclient.SqlResult; -import io.vertx.sqlclient.Transaction; import io.vertx.sqlclient.Tuple; import io.vertx.sqlclient.spi.DatabaseMetadata; +import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture; import static org.hibernate.reactive.util.impl.CompletionStages.rethrow; +import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage; import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; /** @@ -62,7 +63,10 @@ public class SqlClientConnection implements ReactiveConnection { private final SqlConnection connection; // The context associated to the connection. We expect the connection to be executed in this context. private final ContextInternal connectionContext; - private Transaction transaction; + + // The close operation could be called multiple times if an error occurs, + // if we execute it every time, we will have several useless messages in the log + private boolean closed = false; SqlClientConnection( SqlConnection connection, @@ -362,52 +366,88 @@ private SqlConnection client() { @Override public CompletionStage beginTransaction() { - if ( transaction != null ) { - throw new IllegalStateException( "Can't begin a new transaction as an active transaction is already associated to this connection" ); + if ( isTransactionInProgress() ) { + return failedFuture( LOG.liveTransactionDetectedOnBeginTransaction() ); } return connection.begin() .onSuccess( tx -> LOG.tracef( "Transaction started: %s", tx ) ) - .onFailure( v -> LOG.errorf( "Failed to start a transaction: %s", transaction ) ) + .onFailure( throwable -> LOG.errorf( "Failed to start a transaction: %s", throwable.getMessage() ) ) .toCompletionStage() - .thenAccept( this::setTransaction ); + .thenCompose( CompletionStages::voidFuture ); } @Override public CompletionStage commitTransaction() { - return transaction.commit() - .onSuccess( v -> LOG.tracef( "Transaction committed: %s", transaction ) ) - .onFailure( v -> LOG.errorf( "Failed to commit transaction: %s", transaction ) ) - .toCompletionStage() - .whenComplete( this::clearTransaction ); + return connection.transaction() + .commit() + .onSuccess( v -> LOG.tracef( "Transaction committed: %s", connection.transaction() ) ) + .onFailure( throwable -> LOG.errorf( "Failed to commit transaction: %s", throwable.getMessage() ) ) + .toCompletionStage(); } @Override public CompletionStage rollbackTransaction() { - return transaction.rollback() - .onFailure( v -> LOG.errorf( "Failed to rollback transaction: %s", transaction ) ) - .onSuccess( v -> LOG.tracef( "Transaction rolled back: %s", transaction ) ) - .toCompletionStage() - .whenComplete( this::clearTransaction ); + if ( isTransactionInProgress() ) { + return connection.transaction() + .rollback() + .onFailure( throwable -> LOG.errorf( "Failed to rollback transaction: %s", throwable.getMessage() ) ) + .onSuccess( v -> LOG.tracef( "Transaction rolled back: %s", connection.transaction() ) ) + .toCompletionStage(); + } + LOG.trace( "No transaction found to roll back" ); + return voidFuture(); } @Override public CompletionStage close() { - if ( transaction != null ) { - throw new IllegalStateException( "Connection being closed with a live transaction associated to it" ); - } - return connection.close() - .onSuccess( event -> LOG.tracef( "Connection closed: %s", connection ) ) - .onFailure( v -> LOG.errorf( "Failed to close a connection: %s", connection ) ) - .toCompletionStage(); - } - - private void setTransaction(Transaction tx) { - transaction = tx; + // We can probably skip the validation if the connection is already closed...but, you never know + return validateNoTransactionInProgressOnClose() + .handle( CompletionStages::handle ) + .thenCompose( validationHandler -> supplyStage( () -> closed + ? voidFuture().thenAccept( v -> LOG.trace( "Connection already closed" ) ) + : connection.close().toCompletionStage() ) + .handle( CompletionStages::handle ) + .thenCompose( closeConnectionHandler -> { + if ( closeConnectionHandler.hasFailed() ) { + if ( validationHandler.hasFailed() ) { + // Error closing the connection, include the validation error + closeConnectionHandler.getThrowable() + .addSuppressed( validationHandler.getThrowable() ); + } + // Return a failed CompletionStage + return closeConnectionHandler.getResultAsCompletionStage(); + } + if ( !closed ) { + closed = true; + LOG.tracef( "Connection closed: %s", connection ); + } + else { + LOG.tracef( "Connection was already closed: %s", connection ); + } + // Connection closed, return the result of the validation + return validationHandler.getResultAsCompletionStage(); + } ) + ); } - private void clearTransaction(Void v, Throwable x) { - LOG.tracef( "Clearing current transaction instance from connection: %s", transaction ); - transaction = null; + /** + * If there's a transaction open, roll back it and return a failed CompletionStage. + * The validation error is related to closing the connection. + */ + private CompletionStage validateNoTransactionInProgressOnClose() { + if ( isTransactionInProgress() ) { + return supplyStage( this::rollbackTransaction ) + .handle( CompletionStages::handle ) + .thenCompose( rollbackHandler -> { + final Throwable validationError = LOG.liveTransactionDetectedOnClose(); + if ( rollbackHandler.hasFailed() ) { + // Include the error that happened during rollback + validationError.addSuppressed( rollbackHandler.getThrowable() ); + } + return failedFuture( validationError ); + } ); + } + return voidFuture(); } private static class RowSetResult implements Result { @@ -460,5 +500,4 @@ private static void translateNulls(Object[] paramValues) { } } } - } From ebd9ea57397d568651bd09fbe4461a51269c4e9c Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Thu, 18 Dec 2025 10:58:04 +0100 Subject: [PATCH 3/3] [#2926] Test error when there's an unexpected transaction --- .../NoLiveTransactionValidationErrorTest.java | 189 ++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 hibernate-reactive-core/src/test/java/org/hibernate/reactive/NoLiveTransactionValidationErrorTest.java diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/NoLiveTransactionValidationErrorTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/NoLiveTransactionValidationErrorTest.java new file mode 100644 index 000000000..bba0a94bd --- /dev/null +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/NoLiveTransactionValidationErrorTest.java @@ -0,0 +1,189 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletionStage; + +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.stage.Stage; +import org.hibernate.reactive.stage.impl.StageSessionImpl; +import org.hibernate.reactive.util.impl.CompletionStages; + +import org.junit.jupiter.api.Test; + +import io.vertx.junit5.VertxTestContext; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.hibernate.reactive.testing.ReactiveAssertions.assertThrown; + +public class NoLiveTransactionValidationErrorTest extends BaseReactiveTest { + + @Override + protected Collection> annotatedEntities() { + return List.of( Comic.class ); + } + + @Test + public void beginTransactionError(VertxTestContext context) { + final Stage.Session[] session = { null }; + test( context, assertThrown( IllegalStateException.class, getSessionFactory() + .openSession() + .thenCompose( s -> { + session[0] = s; + ReactiveConnection connection = reactiveConnection( s ); + return connection.beginTransaction() + .thenCompose( t1 -> connection.beginTransaction() ); + } ) + ) + .thenAccept( e -> { + assertThat( e ) + .hasMessageContaining( "HR000091" ) + .hasMessageContaining( "Can't begin a new transaction" ); + } ) + .handle( CompletionStages::handle ) + // We try to close the session we opened + .thenCompose( assertionHandler -> close( session[0] ) + .thenCompose( assertionHandler::getResultAsCompletionStage ) ) + ); + } + + // Try to close the session ignoring any error + private static CompletionStage close( Stage.Session session) { + return closeSession( session ).handle( (unused, throwable) -> null ); + } + + private static ReactiveConnection reactiveConnection(Stage.Session s) { + return ( (StageSessionImpl) s ).getReactiveConnection(); + } + + @Test + public void rollbackOnCloseWithStage(VertxTestContext context) { + Comic beneath = new Comic( "979-8887241081", "Beneath The Trees Where Nobody Sees" ); + + test( + context, + assertThrown( IllegalStateException.class, getSessionFactory() + .withTransaction( s -> s + .persist( beneath ) + .thenCompose( v -> s.flush() ) + // Close the connection before committing + .thenCompose( v -> s.close() ) + ) + ) + .thenAccept( e -> assertThat( e ) + .hasMessageContaining( "HR000090" ) + .hasMessageContaining( "closing the connection" ) + ) + .thenCompose( v -> getSessionFactory() + .withTransaction( s -> s.find( Comic.class, beneath.isbn ) ) + ) + .thenAccept( result -> assertThat( result ) + .as( "The persist should have been roll backed" ) + .isNull() ) + ); + } + + @Test + public void rollbackOnErrorWithStage(VertxTestContext context) { + Comic beneath = new Comic( "979-8887241081", "Beneath The Trees Where Nobody Sees" ); + final RuntimeException ohNo = new RuntimeException( "Oh, no!" ); + test( + context, + assertThrown( RuntimeException.class, getSessionFactory() + .withTransaction( s -> s + .persist( beneath ) + .thenCompose( v -> s.flush() ) + // Close the connection before committing + .thenAccept( v -> { + throw ohNo; + } ) + ) + ) + .thenAccept( e -> assertThat( e ).hasMessageContaining( ohNo.getMessage() ) ) + .thenCompose( v -> getSessionFactory() + .withTransaction( s -> s.find( Comic.class, beneath.isbn ) ) + ) + .thenAccept( result -> assertThat( result ) + .as( "The persist should have been roll backed" ) + .isNull() ) + ); + } + + @Test + public void rollbackOnClose(VertxTestContext context) { + Comic beneath = new Comic( "979-8887241081", "Beneath The Trees Where Nobody Sees" ); + + test( + context, + assertThrown( IllegalStateException.class, getMutinySessionFactory() + .withTransaction( s -> s + .persist( beneath ) + .call( s::flush ) + // Close the connection before committing + .call( s::close ) + ) + ) + .invoke( e -> assertThat( e ) + .hasMessageContaining( "HR000090" ) + .hasMessageContaining( "closing the connection" ) + ) + .chain( () -> getMutinySessionFactory() + .withTransaction( s -> s.find( Comic.class, beneath.isbn ) ) + ) + .invoke( result -> assertThat( result ) + .as( "The persist should have been roll backed" ) + .isNull() ) + ); + } + + @Test + public void rollbackOnError(VertxTestContext context) { + Comic beneath = new Comic( "979-8887241081", "Beneath The Trees Where Nobody Sees" ); + final RuntimeException ohNo = new RuntimeException( "Oh, no!" ); + test( + context, + assertThrown( RuntimeException.class, getMutinySessionFactory() + .withTransaction( s -> s + .persist( beneath ) + .call( s::flush ) + .chain( () -> { + throw ohNo; + } ) + ) + ) + .invoke( e -> assertThat( e ).hasMessageContaining( ohNo.getMessage() ) ) + .chain( () -> getMutinySessionFactory() + .withTransaction( s -> s.find( Comic.class, beneath.isbn ) ) + ) + .invoke( result -> assertThat( result ) + .as( "The persist should have been roll backed" ) + .isNull() ) + ); + } + + @Entity + public static class Comic { + @Id + public String isbn; + public String title; + + public Comic() { + } + + public Comic(String iban, String title) { + this.isbn = iban; + this.title = title; + } + + @Override + public String toString() { + return isbn + ":" + title; + } + } +}