Skip to content

Commit c75f23b

Browse files
committed
[#2926] Roll back open transactions on close
There should not be any open transaction in progress when closing a connection. The risk is to leave open connections pending and exhaust the pool. Now, if the close operation detects an open transaction, it will roll back it and throw a validation error before closing the connection.
1 parent e26757d commit c75f23b

File tree

2 files changed

+77
-31
lines changed

2 files changed

+77
-31
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,12 @@ public interface Log extends BasicLogger {
282282
@Message(id = 89, value = "Connection is closed")
283283
IllegalStateException connectionIsClosed();
284284

285+
@Message(id = 90, value = "Live transaction detected while closing the connection: it will be roll backed")
286+
IllegalStateException liveTransactionDetectedOnClose();
287+
288+
@Message(id = 91, value = "Can't begin a new transaction as an active transaction is already associated to this connection")
289+
IllegalStateException liveTransactionDetectedOnBeginTransaction();
290+
285291
// Same method that exists in CoreMessageLogger
286292
@LogMessage(level = WARN)
287293
@Message(id = 104, value = "firstResult/maxResults specified with collection fetch; applying in memory!" )

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java

Lines changed: 71 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@
3636
import io.vertx.sqlclient.RowSet;
3737
import io.vertx.sqlclient.SqlConnection;
3838
import io.vertx.sqlclient.SqlResult;
39-
import io.vertx.sqlclient.Transaction;
4039
import io.vertx.sqlclient.Tuple;
4140
import io.vertx.sqlclient.spi.DatabaseMetadata;
4241

42+
import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
4343
import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;
44+
import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage;
4445
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
4546

4647
/**
@@ -62,7 +63,11 @@ public class SqlClientConnection implements ReactiveConnection {
6263
private final SqlConnection connection;
6364
// The context associated to the connection. We expect the connection to be executed in this context.
6465
private final ContextInternal connectionContext;
65-
private Transaction transaction;
66+
67+
// We keep track of this because the close could be called multiple times if an error occurs, and this creates
68+
// several additional useless exception in the stacktrace
69+
private boolean closed = false;
70+
6671

6772
SqlClientConnection(
6873
SqlConnection connection,
@@ -362,52 +367,88 @@ private SqlConnection client() {
362367

363368
@Override
364369
public CompletionStage<Void> beginTransaction() {
365-
if ( transaction != null ) {
366-
throw new IllegalStateException( "Can't begin a new transaction as an active transaction is already associated to this connection" );
370+
if ( isTransactionInProgress() ) {
371+
return failedFuture( LOG.liveTransactionDetectedOnBeginTransaction() );
367372
}
368373
return connection.begin()
369374
.onSuccess( tx -> LOG.tracef( "Transaction started: %s", tx ) )
370-
.onFailure( v -> LOG.errorf( "Failed to start a transaction: %s", transaction ) )
375+
.onFailure( throwable -> LOG.errorf( "Failed to start a transaction: %s", throwable.getMessage() ) )
371376
.toCompletionStage()
372-
.thenAccept( this::setTransaction );
377+
.thenCompose( CompletionStages::voidFuture );
373378
}
374379

375380
@Override
376381
public CompletionStage<Void> commitTransaction() {
377-
return transaction.commit()
378-
.onSuccess( v -> LOG.tracef( "Transaction committed: %s", transaction ) )
379-
.onFailure( v -> LOG.errorf( "Failed to commit transaction: %s", transaction ) )
380-
.toCompletionStage()
381-
.whenComplete( this::clearTransaction );
382+
return connection.transaction()
383+
.commit()
384+
.onSuccess( v -> LOG.tracef( "Transaction committed: %s", connection.transaction() ) )
385+
.onFailure( throwable -> LOG.errorf( "Failed to commit transaction: %s", throwable.getMessage() ) )
386+
.toCompletionStage();
382387
}
383388

384389
@Override
385390
public CompletionStage<Void> rollbackTransaction() {
386-
return transaction.rollback()
387-
.onFailure( v -> LOG.errorf( "Failed to rollback transaction: %s", transaction ) )
388-
.onSuccess( v -> LOG.tracef( "Transaction rolled back: %s", transaction ) )
389-
.toCompletionStage()
390-
.whenComplete( this::clearTransaction );
391+
if ( isTransactionInProgress() ) {
392+
return connection.transaction()
393+
.rollback()
394+
.onFailure( throwable -> LOG.errorf( "Failed to rollback transaction: %s", throwable.getMessage() ) )
395+
.onSuccess( v -> LOG.tracef( "Transaction rolled back: %s", connection.transaction() ) )
396+
.toCompletionStage();
397+
}
398+
LOG.trace( "No transaction found to roll back" );
399+
return voidFuture();
391400
}
392401

393402
@Override
394403
public CompletionStage<Void> close() {
395-
if ( transaction != null ) {
396-
throw new IllegalStateException( "Connection being closed with a live transaction associated to it" );
397-
}
398-
return connection.close()
399-
.onSuccess( event -> LOG.tracef( "Connection closed: %s", connection ) )
400-
.onFailure( v -> LOG.errorf( "Failed to close a connection: %s", connection ) )
401-
.toCompletionStage();
402-
}
403-
404-
private void setTransaction(Transaction tx) {
405-
transaction = tx;
404+
// We can probably skip the validation if the connection is already closed...but, you never know
405+
return validateNoTransactionInProgressOnClose()
406+
.handle( CompletionStages::handle )
407+
.thenCompose( validationHandler -> supplyStage( () -> closed
408+
? voidFuture().thenAccept( v -> LOG.trace( "Connection already closed" ) )
409+
: connection.close().toCompletionStage() )
410+
.handle( CompletionStages::handle )
411+
.thenCompose( closeConnectionHandler -> {
412+
if ( closeConnectionHandler.hasFailed() ) {
413+
if ( validationHandler.hasFailed() ) {
414+
// Error closing the connection, include the validation error
415+
closeConnectionHandler.getThrowable()
416+
.addSuppressed( validationHandler.getThrowable() );
417+
}
418+
// Return a failed CompletionStage
419+
return closeConnectionHandler.getResultAsCompletionStage();
420+
}
421+
if ( !closed ) {
422+
closed = true;
423+
LOG.tracef( "Connection closed: %s", connection );
424+
}
425+
else {
426+
LOG.tracef( "Connection was already closed: %s", connection );
427+
}
428+
// Connection closed, return the result of the validation
429+
return validationHandler.getResultAsCompletionStage();
430+
} )
431+
);
406432
}
407433

408-
private void clearTransaction(Void v, Throwable x) {
409-
LOG.tracef( "Clearing current transaction instance from connection: %s", transaction );
410-
transaction = null;
434+
/**
435+
* If there's a transaction open, roll back it and return a failed CompletionStage.
436+
* The validation error is related to closing the connection.
437+
*/
438+
private CompletionStage<Void> validateNoTransactionInProgressOnClose() {
439+
if ( isTransactionInProgress() ) {
440+
return supplyStage( this::rollbackTransaction )
441+
.handle( CompletionStages::handle )
442+
.thenCompose( rollbackHandler -> {
443+
final Throwable validationError = LOG.liveTransactionDetectedOnClose();
444+
if ( rollbackHandler.hasFailed() ) {
445+
// Include the error that happened during rollback
446+
validationError.addSuppressed( rollbackHandler.getThrowable() );
447+
}
448+
return failedFuture( validationError );
449+
} );
450+
}
451+
return voidFuture();
411452
}
412453

413454
private static class RowSetResult implements Result {
@@ -460,5 +501,4 @@ private static void translateNulls(Object[] paramValues) {
460501
}
461502
}
462503
}
463-
464504
}

0 commit comments

Comments
 (0)