Skip to content

Commit eb2225d

Browse files
committed
[#2926] Roll back open transactions on close
There should not be any open transaction in progress when closing a connection. The risk is to leave open connections pending and exhaust the pool. Now, if the close operation detects an open transaction, it will roll back it and throw a validation error before closing the connection.
1 parent e26757d commit eb2225d

File tree

2 files changed

+76
-31
lines changed

2 files changed

+76
-31
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,12 @@ public interface Log extends BasicLogger {
282282
@Message(id = 89, value = "Connection is closed")
283283
IllegalStateException connectionIsClosed();
284284

285+
@Message(id = 90, value = "Live transaction detected while closing the connection: it will be roll backed")
286+
IllegalStateException liveTransactionDetectedOnClose();
287+
288+
@Message(id = 91, value = "Can't begin a new transaction as an active transaction is already associated to this connection")
289+
IllegalStateException liveTransactionDetectedOnBeginTransaction();
290+
285291
// Same method that exists in CoreMessageLogger
286292
@LogMessage(level = WARN)
287293
@Message(id = 104, value = "firstResult/maxResults specified with collection fetch; applying in memory!" )

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@
3636
import io.vertx.sqlclient.RowSet;
3737
import io.vertx.sqlclient.SqlConnection;
3838
import io.vertx.sqlclient.SqlResult;
39-
import io.vertx.sqlclient.Transaction;
4039
import io.vertx.sqlclient.Tuple;
4140
import io.vertx.sqlclient.spi.DatabaseMetadata;
4241

42+
import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
4343
import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;
44+
import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage;
4445
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
4546

4647
/**
@@ -62,7 +63,10 @@ public class SqlClientConnection implements ReactiveConnection {
6263
private final SqlConnection connection;
6364
// The context associated to the connection. We expect the connection to be executed in this context.
6465
private final ContextInternal connectionContext;
65-
private Transaction transaction;
66+
67+
// The close operation could be called multiple times if an error occurs,
68+
// if we execute it every time, we will have several useless messages in the log
69+
private boolean closed = false;
6670

6771
SqlClientConnection(
6872
SqlConnection connection,
@@ -362,52 +366,88 @@ private SqlConnection client() {
362366

363367
@Override
364368
public CompletionStage<Void> beginTransaction() {
365-
if ( transaction != null ) {
366-
throw new IllegalStateException( "Can't begin a new transaction as an active transaction is already associated to this connection" );
369+
if ( isTransactionInProgress() ) {
370+
return failedFuture( LOG.liveTransactionDetectedOnBeginTransaction() );
367371
}
368372
return connection.begin()
369373
.onSuccess( tx -> LOG.tracef( "Transaction started: %s", tx ) )
370-
.onFailure( v -> LOG.errorf( "Failed to start a transaction: %s", transaction ) )
374+
.onFailure( throwable -> LOG.errorf( "Failed to start a transaction: %s", throwable.getMessage() ) )
371375
.toCompletionStage()
372-
.thenAccept( this::setTransaction );
376+
.thenCompose( CompletionStages::voidFuture );
373377
}
374378

375379
@Override
376380
public CompletionStage<Void> commitTransaction() {
377-
return transaction.commit()
378-
.onSuccess( v -> LOG.tracef( "Transaction committed: %s", transaction ) )
379-
.onFailure( v -> LOG.errorf( "Failed to commit transaction: %s", transaction ) )
380-
.toCompletionStage()
381-
.whenComplete( this::clearTransaction );
381+
return connection.transaction()
382+
.commit()
383+
.onSuccess( v -> LOG.tracef( "Transaction committed: %s", connection.transaction() ) )
384+
.onFailure( throwable -> LOG.errorf( "Failed to commit transaction: %s", throwable.getMessage() ) )
385+
.toCompletionStage();
382386
}
383387

384388
@Override
385389
public CompletionStage<Void> rollbackTransaction() {
386-
return transaction.rollback()
387-
.onFailure( v -> LOG.errorf( "Failed to rollback transaction: %s", transaction ) )
388-
.onSuccess( v -> LOG.tracef( "Transaction rolled back: %s", transaction ) )
389-
.toCompletionStage()
390-
.whenComplete( this::clearTransaction );
390+
if ( isTransactionInProgress() ) {
391+
return connection.transaction()
392+
.rollback()
393+
.onFailure( throwable -> LOG.errorf( "Failed to rollback transaction: %s", throwable.getMessage() ) )
394+
.onSuccess( v -> LOG.tracef( "Transaction rolled back: %s", connection.transaction() ) )
395+
.toCompletionStage();
396+
}
397+
LOG.trace( "No transaction found to roll back" );
398+
return voidFuture();
391399
}
392400

393401
@Override
394402
public CompletionStage<Void> close() {
395-
if ( transaction != null ) {
396-
throw new IllegalStateException( "Connection being closed with a live transaction associated to it" );
397-
}
398-
return connection.close()
399-
.onSuccess( event -> LOG.tracef( "Connection closed: %s", connection ) )
400-
.onFailure( v -> LOG.errorf( "Failed to close a connection: %s", connection ) )
401-
.toCompletionStage();
402-
}
403-
404-
private void setTransaction(Transaction tx) {
405-
transaction = tx;
403+
// We can probably skip the validation if the connection is already closed...but, you never know
404+
return validateNoTransactionInProgressOnClose()
405+
.handle( CompletionStages::handle )
406+
.thenCompose( validationHandler -> supplyStage( () -> closed
407+
? voidFuture().thenAccept( v -> LOG.trace( "Connection already closed" ) )
408+
: connection.close().toCompletionStage() )
409+
.handle( CompletionStages::handle )
410+
.thenCompose( closeConnectionHandler -> {
411+
if ( closeConnectionHandler.hasFailed() ) {
412+
if ( validationHandler.hasFailed() ) {
413+
// Error closing the connection, include the validation error
414+
closeConnectionHandler.getThrowable()
415+
.addSuppressed( validationHandler.getThrowable() );
416+
}
417+
// Return a failed CompletionStage
418+
return closeConnectionHandler.getResultAsCompletionStage();
419+
}
420+
if ( !closed ) {
421+
closed = true;
422+
LOG.tracef( "Connection closed: %s", connection );
423+
}
424+
else {
425+
LOG.tracef( "Connection was already closed: %s", connection );
426+
}
427+
// Connection closed, return the result of the validation
428+
return validationHandler.getResultAsCompletionStage();
429+
} )
430+
);
406431
}
407432

408-
private void clearTransaction(Void v, Throwable x) {
409-
LOG.tracef( "Clearing current transaction instance from connection: %s", transaction );
410-
transaction = null;
433+
/**
434+
* If there's a transaction open, roll back it and return a failed CompletionStage.
435+
* The validation error is related to closing the connection.
436+
*/
437+
private CompletionStage<Void> validateNoTransactionInProgressOnClose() {
438+
if ( isTransactionInProgress() ) {
439+
return supplyStage( this::rollbackTransaction )
440+
.handle( CompletionStages::handle )
441+
.thenCompose( rollbackHandler -> {
442+
final Throwable validationError = LOG.liveTransactionDetectedOnClose();
443+
if ( rollbackHandler.hasFailed() ) {
444+
// Include the error that happened during rollback
445+
validationError.addSuppressed( rollbackHandler.getThrowable() );
446+
}
447+
return failedFuture( validationError );
448+
} );
449+
}
450+
return voidFuture();
411451
}
412452

413453
private static class RowSetResult implements Result {
@@ -460,5 +500,4 @@ private static void translateNulls(Object[] paramValues) {
460500
}
461501
}
462502
}
463-
464503
}

0 commit comments

Comments
 (0)