3636import io .vertx .sqlclient .RowSet ;
3737import io .vertx .sqlclient .SqlConnection ;
3838import io .vertx .sqlclient .SqlResult ;
39- import io .vertx .sqlclient .Transaction ;
4039import io .vertx .sqlclient .Tuple ;
4140import io .vertx .sqlclient .spi .DatabaseMetadata ;
4241
42+ import static org .hibernate .reactive .util .impl .CompletionStages .failedFuture ;
4343import static org .hibernate .reactive .util .impl .CompletionStages .rethrow ;
44+ import static org .hibernate .reactive .util .impl .CompletionStages .supplyStage ;
4445import static org .hibernate .reactive .util .impl .CompletionStages .voidFuture ;
4546
4647/**
@@ -62,7 +63,10 @@ public class SqlClientConnection implements ReactiveConnection {
6263 private final SqlConnection connection ;
6364 // The context associated to the connection. We expect the connection to be executed in this context.
6465 private final ContextInternal connectionContext ;
65- private Transaction transaction ;
66+
67+ // The close operation could be called multiple times if an error occurs,
68+ // if we execute it every time, we will have several useless messages in the log
69+ private boolean closed = false ;
6670
6771 SqlClientConnection (
6872 SqlConnection connection ,
@@ -301,43 +305,88 @@ private SqlConnection client() {
301305
302306 @ Override
303307 public CompletionStage <Void > beginTransaction () {
304- if ( transaction != null ) {
305- throw new IllegalStateException ( "Can't begin a new transaction as an active transaction is already associated to this connection" );
308+ if ( isTransactionInProgress () ) {
309+ return failedFuture ( LOG . liveTransactionDetectedOnBeginTransaction () );
306310 }
307311 return connection .begin ()
308312 .onSuccess ( tx -> LOG .tracef ( "Transaction started: %s" , tx ) )
309- .onFailure ( v -> LOG .errorf ( "Failed to start a transaction: %s" , transaction ) )
313+ .onFailure ( throwable -> LOG .errorf ( "Failed to start a transaction: %s" , throwable . getMessage () ) )
310314 .toCompletionStage ()
311- .thenAccept ( this :: setTransaction );
315+ .thenCompose ( CompletionStages :: voidFuture );
312316 }
313317
314318 @ Override
315319 public CompletionStage <Void > commitTransaction () {
316- return transaction . commit ()
317- .onSuccess ( v -> LOG . tracef ( "Transaction committed: %s" , transaction ) )
318- .onFailure ( v -> LOG .errorf ( "Failed to commit transaction : %s" , transaction ) )
319- .toCompletionStage ( )
320- .whenComplete ( this :: clearTransaction );
320+ return connection . transaction ()
321+ .commit ( )
322+ .onSuccess ( v -> LOG .tracef ( "Transaction committed : %s" , connection . transaction () ) )
323+ .onFailure ( throwable -> LOG . errorf ( "Failed to commit transaction: %s" , throwable . getMessage () ) )
324+ .toCompletionStage ( );
321325 }
322326
323327 @ Override
324328 public CompletionStage <Void > rollbackTransaction () {
325- return transaction .rollback ()
326- .onFailure ( v -> LOG .errorf ( "Failed to rollback transaction: %s" , transaction ) )
327- .onSuccess ( v -> LOG .tracef ( "Transaction rolled back: %s" , transaction ) )
328- .toCompletionStage ()
329- .whenComplete ( this ::clearTransaction );
329+ if ( isTransactionInProgress () ) {
330+ return connection .transaction ()
331+ .rollback ()
332+ .onFailure ( throwable -> LOG .errorf ( "Failed to rollback transaction: %s" , throwable .getMessage () ) )
333+ .onSuccess ( v -> LOG .tracef ( "Transaction rolled back: %s" , connection .transaction () ) )
334+ .toCompletionStage ();
335+ }
336+ LOG .trace ( "No transaction found to roll back" );
337+ return voidFuture ();
330338 }
331339
332340 @ Override
333341 public CompletionStage <Void > close () {
334- if ( transaction != null ) {
335- throw new IllegalStateException ( "Connection being closed with a live transaction associated to it" );
342+ // We can probably skip the validation if the connection is already closed...but, you never know
343+ return validateNoTransactionInProgressOnClose ()
344+ .handle ( CompletionStages ::handle )
345+ .thenCompose ( validationHandler -> supplyStage ( () -> closed
346+ ? voidFuture ().thenAccept ( v -> LOG .trace ( "Connection already closed" ) )
347+ : connection .close ().toCompletionStage () )
348+ .handle ( CompletionStages ::handle )
349+ .thenCompose ( closeConnectionHandler -> {
350+ if ( closeConnectionHandler .hasFailed () ) {
351+ if ( validationHandler .hasFailed () ) {
352+ // Error closing the connection, include the validation error
353+ closeConnectionHandler .getThrowable ()
354+ .addSuppressed ( validationHandler .getThrowable () );
355+ }
356+ // Return a failed CompletionStage
357+ return closeConnectionHandler .getResultAsCompletionStage ();
358+ }
359+ if ( !closed ) {
360+ closed = true ;
361+ LOG .tracef ( "Connection closed: %s" , connection );
362+ }
363+ else {
364+ LOG .tracef ( "Connection was already closed: %s" , connection );
365+ }
366+ // Connection closed, return the result of the validation
367+ return validationHandler .getResultAsCompletionStage ();
368+ } )
369+ );
370+ }
371+
372+ /**
373+ * If there's a transaction open, roll back it and return a failed CompletionStage.
374+ * The validation error is related to closing the connection.
375+ */
376+ private CompletionStage <Void > validateNoTransactionInProgressOnClose () {
377+ if ( isTransactionInProgress () ) {
378+ return supplyStage ( this ::rollbackTransaction )
379+ .handle ( CompletionStages ::handle )
380+ .thenCompose ( rollbackHandler -> {
381+ final Throwable validationError = LOG .liveTransactionDetectedOnClose ();
382+ if ( rollbackHandler .hasFailed () ) {
383+ // Include the error that happened during rollback
384+ validationError .addSuppressed ( rollbackHandler .getThrowable () );
385+ }
386+ return failedFuture ( validationError );
387+ } );
336388 }
337- return connection .close ()
338- .onSuccess ( event -> LOG .tracef ( "Connection closed: %s" , connection ) )
339- .onFailure ( v -> LOG .errorf ( "Failed to close a connection: %s" , connection ) )
340- .toCompletionStage ();
389+ return voidFuture ();
341390 }
342391
343392 @ SuppressWarnings ("unchecked" )
@@ -377,15 +426,6 @@ private static ResultSet getLastInsertedIdAsResultSet(RowSet<Row> rows, Class<?>
377426 return null ;
378427 }
379428
380- private void setTransaction (Transaction tx ) {
381- transaction = tx ;
382- }
383-
384- private void clearTransaction (Void v , Throwable x ) {
385- LOG .tracef ( "Clearing current transaction instance from connection: %s" , transaction );
386- transaction = null ;
387- }
388-
389429 private static class RowSetResult implements Result {
390430 private final RowSet <Row > rowset ;
391431 private final RowIterator <Row > it ;
@@ -436,5 +476,4 @@ private static void translateNulls(Object[] paramValues) {
436476 }
437477 }
438478 }
439-
440479}
0 commit comments