3636import io .vertx .sqlclient .RowSet ;
3737import io .vertx .sqlclient .SqlConnection ;
3838import io .vertx .sqlclient .SqlResult ;
39- import io .vertx .sqlclient .Transaction ;
4039import io .vertx .sqlclient .Tuple ;
4140import io .vertx .sqlclient .spi .DatabaseMetadata ;
4241
42+ import static org .hibernate .reactive .util .impl .CompletionStages .failedFuture ;
4343import static org .hibernate .reactive .util .impl .CompletionStages .rethrow ;
44+ import static org .hibernate .reactive .util .impl .CompletionStages .supplyStage ;
4445import static org .hibernate .reactive .util .impl .CompletionStages .voidFuture ;
4546
4647/**
@@ -62,7 +63,10 @@ public class SqlClientConnection implements ReactiveConnection {
6263 private final SqlConnection connection ;
6364 // The context associated to the connection. We expect the connection to be executed in this context.
6465 private final ContextInternal connectionContext ;
65- private Transaction transaction ;
66+
67+ // The close operation could be called multiple times if an error occurs,
68+ // if we execute it every time, we will have several useless messages in the log
69+ private boolean closed = false ;
6670
6771 SqlClientConnection (
6872 SqlConnection connection ,
@@ -362,52 +366,88 @@ private SqlConnection client() {
362366
363367 @ Override
364368 public CompletionStage <Void > beginTransaction () {
365- if ( transaction != null ) {
366- throw new IllegalStateException ( "Can't begin a new transaction as an active transaction is already associated to this connection" );
369+ if ( isTransactionInProgress () ) {
370+ return failedFuture ( LOG . liveTransactionDetectedOnBeginTransaction () );
367371 }
368372 return connection .begin ()
369373 .onSuccess ( tx -> LOG .tracef ( "Transaction started: %s" , tx ) )
370- .onFailure ( v -> LOG .errorf ( "Failed to start a transaction: %s" , transaction ) )
374+ .onFailure ( throwable -> LOG .errorf ( "Failed to start a transaction: %s" , throwable . getMessage () ) )
371375 .toCompletionStage ()
372- .thenAccept ( this :: setTransaction );
376+ .thenCompose ( CompletionStages :: voidFuture );
373377 }
374378
375379 @ Override
376380 public CompletionStage <Void > commitTransaction () {
377- return transaction . commit ()
378- .onSuccess ( v -> LOG . tracef ( "Transaction committed: %s" , transaction ) )
379- .onFailure ( v -> LOG .errorf ( "Failed to commit transaction : %s" , transaction ) )
380- .toCompletionStage ( )
381- .whenComplete ( this :: clearTransaction );
381+ return connection . transaction ()
382+ .commit ( )
383+ .onSuccess ( v -> LOG .tracef ( "Transaction committed : %s" , connection . transaction () ) )
384+ .onFailure ( throwable -> LOG . errorf ( "Failed to commit transaction: %s" , throwable . getMessage () ) )
385+ .toCompletionStage ( );
382386 }
383387
384388 @ Override
385389 public CompletionStage <Void > rollbackTransaction () {
386- return transaction .rollback ()
387- .onFailure ( v -> LOG .errorf ( "Failed to rollback transaction: %s" , transaction ) )
388- .onSuccess ( v -> LOG .tracef ( "Transaction rolled back: %s" , transaction ) )
389- .toCompletionStage ()
390- .whenComplete ( this ::clearTransaction );
390+ if ( isTransactionInProgress () ) {
391+ return connection .transaction ()
392+ .rollback ()
393+ .onFailure ( throwable -> LOG .errorf ( "Failed to rollback transaction: %s" , throwable .getMessage () ) )
394+ .onSuccess ( v -> LOG .tracef ( "Transaction rolled back: %s" , connection .transaction () ) )
395+ .toCompletionStage ();
396+ }
397+ LOG .trace ( "No transaction found to roll back" );
398+ return voidFuture ();
391399 }
392400
393401 @ Override
394402 public CompletionStage <Void > close () {
395- if ( transaction != null ) {
396- throw new IllegalStateException ( "Connection being closed with a live transaction associated to it" );
397- }
398- return connection .close ()
399- .onSuccess ( event -> LOG .tracef ( "Connection closed: %s" , connection ) )
400- .onFailure ( v -> LOG .errorf ( "Failed to close a connection: %s" , connection ) )
401- .toCompletionStage ();
402- }
403-
404- private void setTransaction (Transaction tx ) {
405- transaction = tx ;
403+ // We can probably skip the validation if the connection is already closed...but, you never know
404+ return validateNoTransactionInProgressOnClose ()
405+ .handle ( CompletionStages ::handle )
406+ .thenCompose ( validationHandler -> supplyStage ( () -> closed
407+ ? voidFuture ().thenAccept ( v -> LOG .trace ( "Connection already closed" ) )
408+ : connection .close ().toCompletionStage () )
409+ .handle ( CompletionStages ::handle )
410+ .thenCompose ( closeConnectionHandler -> {
411+ if ( closeConnectionHandler .hasFailed () ) {
412+ if ( validationHandler .hasFailed () ) {
413+ // Error closing the connection, include the validation error
414+ closeConnectionHandler .getThrowable ()
415+ .addSuppressed ( validationHandler .getThrowable () );
416+ }
417+ // Return a failed CompletionStage
418+ return closeConnectionHandler .getResultAsCompletionStage ();
419+ }
420+ if ( !closed ) {
421+ closed = true ;
422+ LOG .tracef ( "Connection closed: %s" , connection );
423+ }
424+ else {
425+ LOG .tracef ( "Connection was already closed: %s" , connection );
426+ }
427+ // Connection closed, return the result of the validation
428+ return validationHandler .getResultAsCompletionStage ();
429+ } )
430+ );
406431 }
407432
408- private void clearTransaction (Void v , Throwable x ) {
409- LOG .tracef ( "Clearing current transaction instance from connection: %s" , transaction );
410- transaction = null ;
433+ /**
434+ * If there's a transaction open, roll back it and return a failed CompletionStage.
435+ * The validation error is related to closing the connection.
436+ */
437+ private CompletionStage <Void > validateNoTransactionInProgressOnClose () {
438+ if ( isTransactionInProgress () ) {
439+ return supplyStage ( this ::rollbackTransaction )
440+ .handle ( CompletionStages ::handle )
441+ .thenCompose ( rollbackHandler -> {
442+ final Throwable validationError = LOG .liveTransactionDetectedOnClose ();
443+ if ( rollbackHandler .hasFailed () ) {
444+ // Include the error that happened during rollback
445+ validationError .addSuppressed ( rollbackHandler .getThrowable () );
446+ }
447+ return failedFuture ( validationError );
448+ } );
449+ }
450+ return voidFuture ();
411451 }
412452
413453 private static class RowSetResult implements Result {
@@ -460,5 +500,4 @@ private static void translateNulls(Object[] paramValues) {
460500 }
461501 }
462502 }
463-
464503}
0 commit comments