diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java
index 89396dae5d..ec72cc777d 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java
@@ -41,6 +41,8 @@
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import org.bson.ByteBufNIO;
+
/**
*
This class is not part of the public API and may be removed or changed at any time
*/
@@ -174,7 +176,11 @@ private void pipeOneBuffer(final AsyncWritableByteChannelAdapter byteChannel, fi
byteChannel.write(byteBuffer.asNIO(), operationContext, new AsyncCompletionHandler() {
@Override
public void completed(@Nullable final Void t) {
- if (byteBuffer.hasRemaining()) {
+ boolean isNioBuf = byteBuffer instanceof ByteBufNIO;
+ // in a rare case when ByteBufNIO was already released
+ // the call to `hasRemaining` may throw an NPE
+ // so we add this new if condition to catch this case
+ if ((isNioBuf && byteBuffer.asNIO() != null && byteBuffer.hasRemaining()) || byteBuffer.hasRemaining()) {
byteChannel.write(byteBuffer.asNIO(), operationContext, this);
} else {
outerHandler.completed(null);
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java
index 07afb94c8d..6dd0ed7c46 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java
@@ -28,4 +28,10 @@ final class CrudProseTest extends com.mongodb.client.CrudProseTest {
protected MongoClient createMongoClient(final MongoClientSettings.Builder mongoClientSettingsBuilder) {
return new SyncMongoClient(mongoClientSettingsBuilder);
}
+
+ @Override
+ protected MongoClient createMongoClientWithExtendedCloseWait(final MongoClientSettings.Builder mongoClientSettingsBuilder,
+ final int waitIterations) {
+ return new SyncMongoClient(mongoClientSettingsBuilder, null, waitIterations);
+ }
}
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java
index 348bef110f..ae3825af6b 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java
@@ -152,7 +152,12 @@ public SyncMongoClient(final MongoClientSettings.Builder builder) {
}
public SyncMongoClient(final MongoClientSettings.Builder builder, @Nullable final MongoDriverInformation mongoDriverInformation) {
- this.connectionPoolCounter = new ConnectionPoolCounter();
+ this(builder, mongoDriverInformation, ConnectionPoolCounter.DEFAULT_MAX_ITERATIONS);
+ }
+
+ public SyncMongoClient(final MongoClientSettings.Builder builder, @Nullable final MongoDriverInformation mongoDriverInformation,
+ final int connectionCloseWaitIterations) {
+ this.connectionPoolCounter = new ConnectionPoolCounter(connectionCloseWaitIterations);
builder.applyToConnectionPoolSettings(b -> b.addConnectionPoolListener(connectionPoolCounter));
this.wrapped = MongoClients.create(builder.build(), mongoDriverInformation);
this.delegate = new SyncMongoCluster(wrapped);
@@ -345,7 +350,13 @@ public void appendMetadata(final MongoDriverInformation mongoDriverInformation)
}
static class ConnectionPoolCounter implements ConnectionPoolListener {
+ static final int DEFAULT_MAX_ITERATIONS = 10; // 10 * 200ms = 2 seconds
private final AtomicInteger activeConnections = new AtomicInteger(0);
+ private final int maxIterations;
+
+ ConnectionPoolCounter(final int maxIterations) {
+ this.maxIterations = maxIterations;
+ }
@Override
public void connectionCheckedOut(final ConnectionCheckedOutEvent event) {
@@ -361,7 +372,10 @@ protected void assertConnectionsClosed() {
int activeConnectionsCount = activeConnections.get();
boolean connectionsClosed = activeConnectionsCount == 0;
int counter = 0;
- while (counter < 10 && !connectionsClosed) {
+ // Wait up to maxIterations (default 2 seconds) for connections to close.
+ // This allows time for long-running operations (like bulk writes with many errors)
+ // to complete their cleanup after cancellation.
+ while (counter < maxIterations && !connectionsClosed) {
activeConnectionsCount = activeConnections.get();
connectionsClosed = activeConnectionsCount == 0;
if (!connectionsClosed) {
@@ -369,7 +383,6 @@ protected void assertConnectionsClosed() {
counter++;
}
}
-
Assertions.assertTrue(activeConnectionsCount == 0,
format("Expected all connections to be closed after closing the client. %n"
+ "The connection pool listener reports '%d' open connections.", activeConnectionsCount));
diff --git a/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java
index d269a3cad5..efc0049936 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java
@@ -243,9 +243,12 @@ protected void testBulkWriteCollectsWriteConcernErrorsAcrossBatches() throws Int
protected void testBulkWriteHandlesWriteErrorsAcrossBatches(final boolean ordered) {
assumeTrue(serverVersionAtLeast(8, 0));
TestCommandListener commandListener = new TestCommandListener();
- try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder()
+ // Use longer timeout (300 iterations = 60 seconds) as this test involves many write errors
+ // and the async cleanup takes longer than the default 2 seconds
+ final int waitIterations = 300;
+ try (MongoClient client = createMongoClientWithExtendedCloseWait(getMongoClientSettingsBuilder()
.retryWrites(false)
- .addCommandListener(commandListener))) {
+ .addCommandListener(commandListener), waitIterations)) {
int maxWriteBatchSize = droppedDatabase(client).runCommand(new Document("hello", 1)).getInteger("maxWriteBatchSize");
Document document = new Document("_id", 1);
MongoCollection collection = droppedCollection(client, Document.class);
@@ -564,6 +567,16 @@ protected MongoClient createMongoClient(final MongoClientSettings.Builder mongoC
return MongoClients.create(mongoClientSettingsBuilder.build());
}
+ /**
+ * Should be ignored by sync driver.
+ * The only reason for this method to exist is
+ * to allow async driver to extend the close wait time
+ */
+ protected MongoClient createMongoClientWithExtendedCloseWait(final MongoClientSettings.Builder mongoClientSettingsBuilder,
+ final int waitIterations) {
+ return createMongoClient(mongoClientSettingsBuilder);
+ }
+
private MongoCollection droppedCollection(final MongoClient client, final Class documentClass) {
return droppedDatabase(client).getCollection(NAMESPACE.getCollectionName(), documentClass);
}