Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import org.bson.ByteBufNIO;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
Expand Down Expand Up @@ -174,7 +176,11 @@ private void pipeOneBuffer(final AsyncWritableByteChannelAdapter byteChannel, fi
byteChannel.write(byteBuffer.asNIO(), operationContext, new AsyncCompletionHandler<Void>() {
@Override
public void completed(@Nullable final Void t) {
if (byteBuffer.hasRemaining()) {
boolean isNioBuf = byteBuffer instanceof ByteBufNIO;
// in a rare case when ByteBufNIO was already released
// the call to `hasRemaining` may throw an NPE
// so we add this new if condition to catch this case
if ((isNioBuf && byteBuffer.asNIO() != null && byteBuffer.hasRemaining()) || byteBuffer.hasRemaining()) {
byteChannel.write(byteBuffer.asNIO(), operationContext, this);
} else {
outerHandler.completed(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ final class CrudProseTest extends com.mongodb.client.CrudProseTest {
protected MongoClient createMongoClient(final MongoClientSettings.Builder mongoClientSettingsBuilder) {
return new SyncMongoClient(mongoClientSettingsBuilder);
}

@Override
protected MongoClient createMongoClientWithExtendedCloseWait(final MongoClientSettings.Builder mongoClientSettingsBuilder,
final int waitIterations) {
return new SyncMongoClient(mongoClientSettingsBuilder, null, waitIterations);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ public SyncMongoClient(final MongoClientSettings.Builder builder) {
}

public SyncMongoClient(final MongoClientSettings.Builder builder, @Nullable final MongoDriverInformation mongoDriverInformation) {
this.connectionPoolCounter = new ConnectionPoolCounter();
this(builder, mongoDriverInformation, ConnectionPoolCounter.DEFAULT_MAX_ITERATIONS);
}

public SyncMongoClient(final MongoClientSettings.Builder builder, @Nullable final MongoDriverInformation mongoDriverInformation,
final int connectionCloseWaitIterations) {
this.connectionPoolCounter = new ConnectionPoolCounter(connectionCloseWaitIterations);
builder.applyToConnectionPoolSettings(b -> b.addConnectionPoolListener(connectionPoolCounter));
this.wrapped = MongoClients.create(builder.build(), mongoDriverInformation);
this.delegate = new SyncMongoCluster(wrapped);
Expand Down Expand Up @@ -345,7 +350,13 @@ public void appendMetadata(final MongoDriverInformation mongoDriverInformation)
}

static class ConnectionPoolCounter implements ConnectionPoolListener {
static final int DEFAULT_MAX_ITERATIONS = 10; // 10 * 200ms = 2 seconds
private final AtomicInteger activeConnections = new AtomicInteger(0);
private final int maxIterations;

ConnectionPoolCounter(final int maxIterations) {
this.maxIterations = maxIterations;
}

@Override
public void connectionCheckedOut(final ConnectionCheckedOutEvent event) {
Expand All @@ -361,15 +372,17 @@ protected void assertConnectionsClosed() {
int activeConnectionsCount = activeConnections.get();
boolean connectionsClosed = activeConnectionsCount == 0;
int counter = 0;
while (counter < 10 && !connectionsClosed) {
// Wait up to maxIterations (default 2 seconds) for connections to close.
// This allows time for long-running operations (like bulk writes with many errors)
// to complete their cleanup after cancellation.
while (counter < maxIterations && !connectionsClosed) {
activeConnectionsCount = activeConnections.get();
connectionsClosed = activeConnectionsCount == 0;
if (!connectionsClosed) {
sleep(200);
counter++;
}
}

Assertions.assertTrue(activeConnectionsCount == 0,
format("Expected all connections to be closed after closing the client. %n"
+ "The connection pool listener reports '%d' open connections.", activeConnectionsCount));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,12 @@ protected void testBulkWriteCollectsWriteConcernErrorsAcrossBatches() throws Int
protected void testBulkWriteHandlesWriteErrorsAcrossBatches(final boolean ordered) {
assumeTrue(serverVersionAtLeast(8, 0));
TestCommandListener commandListener = new TestCommandListener();
try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder()
// Use longer timeout (300 iterations = 60 seconds) as this test involves many write errors
// and the async cleanup takes longer than the default 2 seconds
final int waitIterations = 300;
try (MongoClient client = createMongoClientWithExtendedCloseWait(getMongoClientSettingsBuilder()
.retryWrites(false)
.addCommandListener(commandListener))) {
.addCommandListener(commandListener), waitIterations)) {
int maxWriteBatchSize = droppedDatabase(client).runCommand(new Document("hello", 1)).getInteger("maxWriteBatchSize");
Document document = new Document("_id", 1);
MongoCollection<Document> collection = droppedCollection(client, Document.class);
Expand Down Expand Up @@ -564,6 +567,16 @@ protected MongoClient createMongoClient(final MongoClientSettings.Builder mongoC
return MongoClients.create(mongoClientSettingsBuilder.build());
}

/**
* Should be ignored by sync driver.
* The only reason for this method to exist is
* to allow async driver to extend the close wait time
*/
protected MongoClient createMongoClientWithExtendedCloseWait(final MongoClientSettings.Builder mongoClientSettingsBuilder,
final int waitIterations) {
return createMongoClient(mongoClientSettingsBuilder);
}

private <TDocument> MongoCollection<TDocument> droppedCollection(final MongoClient client, final Class<TDocument> documentClass) {
return droppedDatabase(client).getCollection(NAMESPACE.getCollectionName(), documentClass);
}
Expand Down