2020import org .hibernate .engine .jdbc .spi .SqlExceptionHelper ;
2121import org .hibernate .engine .jdbc .spi .SqlStatementLogger ;
2222import org .hibernate .reactive .adaptor .impl .ResultSetAdaptor ;
23+ import org .hibernate .reactive .logging .impl .Log ;
24+ import org .hibernate .reactive .logging .impl .LoggerFactory ;
2325import org .hibernate .reactive .pool .ReactiveConnection ;
2426import org .hibernate .reactive .pool .ReactiveConnectionPool ;
2527
3335import io .vertx .sqlclient .Tuple ;
3436import io .vertx .sqlclient .spi .DatabaseMetadata ;
3537
38+ import static java .lang .invoke .MethodHandles .lookup ;
39+ import static org .hibernate .reactive .util .impl .CompletionStages .failedFuture ;
3640import static org .hibernate .reactive .util .impl .CompletionStages .rethrow ;
3741import static org .hibernate .reactive .util .impl .CompletionStages .voidFuture ;
3842
@@ -227,11 +231,13 @@ private SqlClientConnection newConnection(SqlConnection connection, SqlException
227231
228232 private static class ProxyConnection implements ReactiveConnection {
229233
234+ private static final Log LOG = LoggerFactory .make ( Log .class , lookup () );
235+
230236 private static final VarHandle OPENED_HANDLE ;
231237
232238 static {
233239 try {
234- MethodHandles .Lookup lookup = MethodHandles . lookup ();
240+ MethodHandles .Lookup lookup = lookup ();
235241 OPENED_HANDLE = lookup .findVarHandle ( ProxyConnection .class , "opened" , boolean .class );
236242 }
237243 catch (ReflectiveOperationException e ) {
@@ -242,6 +248,7 @@ private static class ProxyConnection implements ReactiveConnection {
242248 private final Supplier <CompletionStage <ReactiveConnection >> connectionSupplier ;
243249 private final CompletableFuture <ReactiveConnection > connectionFuture = new CompletableFuture <>();
244250 private volatile boolean opened = false ;
251+ private volatile boolean closed = false ;
245252
246253 public ProxyConnection (Supplier <CompletionStage <ReactiveConnection >> connectionSupplier ) {
247254 this .connectionSupplier = connectionSupplier ;
@@ -251,6 +258,9 @@ public ProxyConnection(Supplier<CompletionStage<ReactiveConnection>> connectionS
251258 * @return the existing {@link ReactiveConnection}, or open a new one
252259 */
253260 private CompletionStage <ReactiveConnection > connection () {
261+ if ( closed ) {
262+ return failedFuture ( LOG .connectionIsClosed () );
263+ }
254264 if ( opened ) {
255265 return connectionFuture ;
256266 }
@@ -275,10 +285,13 @@ public boolean isTransactionInProgress() {
275285
276286 @ Override
277287 public DatabaseMetadata getDatabaseMetadata () {
278- return Objects .requireNonNull (
279- connectionFuture .getNow ( null ),
280- "Database metadata not available until the connection is opened"
281- ).getDatabaseMetadata ();
288+ if ( closed ) {
289+ throw LOG .connectionIsClosed ();
290+ }
291+
292+ return Objects
293+ .requireNonNull ( connectionFuture .getNow ( null ), "Database metadata not available until a connection has been created" )
294+ .getDatabaseMetadata ();
282295 }
283296
284297 @ Override
@@ -386,10 +399,22 @@ public CompletionStage<Void> rollbackTransaction() {
386399 return connection ().thenCompose ( ReactiveConnection ::rollbackTransaction );
387400 }
388401
389- @ Override
390- public ProxyConnection withBatchSize (int batchSize ) {
391- connectionFuture .thenApply ( reactiveConnection -> reactiveConnection .withBatchSize ( batchSize ) );
392- return this ;
402+ public ReactiveConnection withBatchSize (int batchSize ) {
403+ if ( closed ) {
404+ throw LOG .connectionIsClosed ();
405+ }
406+
407+ if ( connectionFuture .isDone () ) {
408+ // connection exists, we can let callers use the delegate and forget about the proxy.
409+ return connectionFuture .getNow ( null ).withBatchSize ( batchSize );
410+ }
411+ else {
412+ return new ProxyConnection ( () -> opened
413+ // Connection has been requested but not created yet
414+ ? connectionFuture .thenApply ( c -> c .withBatchSize ( batchSize ) )
415+ // Connection has not been requested
416+ : connectionSupplier .get ().thenApply ( c -> c .withBatchSize ( batchSize ) ) );
417+ }
393418 }
394419
395420 @ Override
@@ -399,8 +424,9 @@ public CompletionStage<Void> executeBatch() {
399424
400425 @ Override
401426 public CompletionStage <Void > close () {
427+ closed = true ;
402428 return opened
403- ? connectionFuture .getNow ( null ). close ( )
429+ ? connectionFuture .thenCompose ( ReactiveConnection :: close )
404430 : voidFuture ();
405431 }
406432 }
0 commit comments