diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java index 89396dae5d..ec72cc777d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java @@ -41,6 +41,8 @@ import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import org.bson.ByteBufNIO; + /** *

This class is not part of the public API and may be removed or changed at any time

*/ @@ -174,7 +176,11 @@ private void pipeOneBuffer(final AsyncWritableByteChannelAdapter byteChannel, fi byteChannel.write(byteBuffer.asNIO(), operationContext, new AsyncCompletionHandler() { @Override public void completed(@Nullable final Void t) { - if (byteBuffer.hasRemaining()) { + boolean isNioBuf = byteBuffer instanceof ByteBufNIO; + // in a rare case when ByteBufNIO was already released + // the call to `hasRemaining` may throw an NPE + // so we add this new if condition to catch this case + if ((isNioBuf && byteBuffer.asNIO() != null && byteBuffer.hasRemaining()) || byteBuffer.hasRemaining()) { byteChannel.write(byteBuffer.asNIO(), operationContext, this); } else { outerHandler.completed(null); diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java index 07afb94c8d..6dd0ed7c46 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java @@ -28,4 +28,10 @@ final class CrudProseTest extends com.mongodb.client.CrudProseTest { protected MongoClient createMongoClient(final MongoClientSettings.Builder mongoClientSettingsBuilder) { return new SyncMongoClient(mongoClientSettingsBuilder); } + + @Override + protected MongoClient createMongoClientWithExtendedCloseWait(final MongoClientSettings.Builder mongoClientSettingsBuilder, + final int waitIterations) { + return new SyncMongoClient(mongoClientSettingsBuilder, null, waitIterations); + } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java index 348bef110f..ae3825af6b 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java @@ -152,7 +152,12 @@ public SyncMongoClient(final MongoClientSettings.Builder builder) { } public SyncMongoClient(final MongoClientSettings.Builder builder, @Nullable final MongoDriverInformation mongoDriverInformation) { - this.connectionPoolCounter = new ConnectionPoolCounter(); + this(builder, mongoDriverInformation, ConnectionPoolCounter.DEFAULT_MAX_ITERATIONS); + } + + public SyncMongoClient(final MongoClientSettings.Builder builder, @Nullable final MongoDriverInformation mongoDriverInformation, + final int connectionCloseWaitIterations) { + this.connectionPoolCounter = new ConnectionPoolCounter(connectionCloseWaitIterations); builder.applyToConnectionPoolSettings(b -> b.addConnectionPoolListener(connectionPoolCounter)); this.wrapped = MongoClients.create(builder.build(), mongoDriverInformation); this.delegate = new SyncMongoCluster(wrapped); @@ -345,7 +350,13 @@ public void appendMetadata(final MongoDriverInformation mongoDriverInformation) } static class ConnectionPoolCounter implements ConnectionPoolListener { + static final int DEFAULT_MAX_ITERATIONS = 10; // 10 * 200ms = 2 seconds private final AtomicInteger activeConnections = new AtomicInteger(0); + private final int maxIterations; + + ConnectionPoolCounter(final int maxIterations) { + this.maxIterations = maxIterations; + } @Override public void connectionCheckedOut(final ConnectionCheckedOutEvent event) { @@ -361,7 +372,10 @@ protected void assertConnectionsClosed() { int activeConnectionsCount = activeConnections.get(); boolean connectionsClosed = activeConnectionsCount == 0; int counter = 0; - while (counter < 10 && !connectionsClosed) { + // Wait up to maxIterations (default 2 seconds) for connections to close. + // This allows time for long-running operations (like bulk writes with many errors) + // to complete their cleanup after cancellation. + while (counter < maxIterations && !connectionsClosed) { activeConnectionsCount = activeConnections.get(); connectionsClosed = activeConnectionsCount == 0; if (!connectionsClosed) { @@ -369,7 +383,6 @@ protected void assertConnectionsClosed() { counter++; } } - Assertions.assertTrue(activeConnectionsCount == 0, format("Expected all connections to be closed after closing the client. %n" + "The connection pool listener reports '%d' open connections.", activeConnectionsCount)); diff --git a/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java index d269a3cad5..efc0049936 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java @@ -243,9 +243,12 @@ protected void testBulkWriteCollectsWriteConcernErrorsAcrossBatches() throws Int protected void testBulkWriteHandlesWriteErrorsAcrossBatches(final boolean ordered) { assumeTrue(serverVersionAtLeast(8, 0)); TestCommandListener commandListener = new TestCommandListener(); - try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() + // Use longer timeout (300 iterations = 60 seconds) as this test involves many write errors + // and the async cleanup takes longer than the default 2 seconds + final int waitIterations = 300; + try (MongoClient client = createMongoClientWithExtendedCloseWait(getMongoClientSettingsBuilder() .retryWrites(false) - .addCommandListener(commandListener))) { + .addCommandListener(commandListener), waitIterations)) { int maxWriteBatchSize = droppedDatabase(client).runCommand(new Document("hello", 1)).getInteger("maxWriteBatchSize"); Document document = new Document("_id", 1); MongoCollection collection = droppedCollection(client, Document.class); @@ -564,6 +567,16 @@ protected MongoClient createMongoClient(final MongoClientSettings.Builder mongoC return MongoClients.create(mongoClientSettingsBuilder.build()); } + /** + * Should be ignored by sync driver. + * The only reason for this method to exist is + * to allow async driver to extend the close wait time + */ + protected MongoClient createMongoClientWithExtendedCloseWait(final MongoClientSettings.Builder mongoClientSettingsBuilder, + final int waitIterations) { + return createMongoClient(mongoClientSettingsBuilder); + } + private MongoCollection droppedCollection(final MongoClient client, final Class documentClass) { return droppedDatabase(client).getCollection(NAMESPACE.getCollectionName(), documentClass); }