Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,55 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static org.bson.assertions.Assertions.isTrueArgument;
import static org.bson.assertions.Assertions.notNull;

class CompositeByteBuf implements ByteBuf {
/**
* A composite {@link ByteBuf} that provides a unified view over a list of component buffers.
*
* <h2>ByteBuf Ownership and Reference Counting</h2>
* <p>This class manages the lifecycle of its component buffers with the following rules:</p>
* <ul>
* <li><b>Constructor:</b> Takes buffers as input but does NOT take ownership. The buffers are made
* read-only via {@link ByteBuf#asReadOnly()}, which creates read only duplicates.
* The original buffers remain owned by the caller.</li>
* <li><b>{@link #duplicate()}:</b> Creates a new composite with independent position/limit but calls
* {@link ByteBuf#retain()} on each component, incrementing their reference counts. The duplicate
* owns these retained references and releases them when it is released.</li>
* <li><b>{@link #retain()}:</b> Increments the composite's reference count AND retains all component
* buffers. Each retain() call must be paired with a {@link #release()}.</li>
* <li><b>{@link #release()}:</b> Decrements the composite's reference count AND releases all component
* buffers. When the count reaches 0, subsequent access will throw {@link IllegalStateException}.</li>
* </ul>
*
* <p><strong>Important:</strong> The composite's reference count is independent from its components'
* reference counts, but they are kept in sync via {@link #retain()} and {@link #release()} methods.</p>
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
final class CompositeByteBuf implements ByteBuf {
private final List<Component> components;
private final AtomicInteger referenceCount = new AtomicInteger(1);
private int position;
private int limit;

/**
* Creates a composite buffer from the given list of buffers.
*
* <p><strong>ByteBuf Ownership:</strong> This constructor does NOT take ownership of the input buffers.
* It calls {@link ByteBuf#asReadOnly()} on each buffer, which creates a shallow read-only view that
* shares the same underlying data and reference count as the original. The caller retains ownership
* of the original buffers and is responsible for their lifecycle.</p>
*
* <p>The composite starts with a reference count of 1. When {@link #release()} is called and the
* reference count reaches 0, it does NOT automatically release the original buffers - the caller
* must handle that separately.</p>
*
* @param buffers the list of buffers to compose, must not be null or empty
*/
CompositeByteBuf(final List<ByteBuf> buffers) {
notNull("buffers", buffers);
isTrueArgument("buffer list not empty", !buffers.isEmpty());
Expand All @@ -50,7 +88,9 @@ class CompositeByteBuf implements ByteBuf {
}

private CompositeByteBuf(final CompositeByteBuf from) {
components = from.components;
components = from.components.stream().map(c ->
new Component(c.buffer.retain(), c.offset))
.collect(Collectors.toList());
position = from.position();
limit = from.limit();
}
Expand Down Expand Up @@ -277,6 +317,19 @@ public ByteBuf asReadOnly() {
throw new UnsupportedOperationException();
}

/**
* Creates a duplicate of this composite buffer with independent position and limit.
*
* <p><strong>ByteBuf Ownership:</strong> The duplicate calls {@link ByteBuf#retain()} on each
* component buffer, incrementing their reference counts. The duplicate owns these retained
* references and starts with its own reference count of 1. <strong>The caller is responsible
* for releasing the duplicate</strong> when done, which will release the component buffers.</p>
*
* <p>The duplicate shares the underlying buffer data with this composite but has independent
* reference counting and position/limit state.</p>
*
* @return a new composite buffer that shares data with this one but has independent state
*/
@Override
public ByteBuf duplicate() {
return new CompositeByteBuf(this);
Expand All @@ -300,21 +353,42 @@ public int getReferenceCount() {
return referenceCount.get();
}

/**
* Increments the reference count of this composite buffer and all component buffers.
*
* <p><strong>Important:</strong> This method retains both the composite's reference count and all
* component buffers. If the reference count is already 0, an {@link IllegalStateException} is thrown.
* Note that if an exception is thrown, the component buffers will have been retained before the
* exception occurs.</p>
*
* @return this buffer
* @throws IllegalStateException if the reference count is already 0
*/
@Override
public ByteBuf retain() {
if (referenceCount.incrementAndGet() == 1) {
referenceCount.decrementAndGet();
throw new IllegalStateException("Attempted to increment the reference count when it is already 0");
}
components.forEach(c -> c.buffer.retain());
return this;
}

/**
* Decrements the reference count of this composite buffer and all component buffers.
*
* <p><strong>Important:</strong> This method releases both the composite's reference count and all
* component buffers. All component buffers are released even if an exception occurs.</p>
*
* @throws IllegalStateException if the reference count is already 0
*/
@Override
public void release() {
if (referenceCount.decrementAndGet() < 0) {
referenceCount.incrementAndGet();
throw new IllegalStateException("Attempted to decrement the reference count below 0");
}
components.forEach(c -> c.buffer.release());
}

private Component findComponent(final int index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.atomic.AtomicInteger;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public final class NettyByteBuf implements ByteBuf {

private final AtomicInteger referenceCount = new AtomicInteger(1);
private io.netty.buffer.ByteBuf proxied;
private boolean isWriting = true;

Expand Down Expand Up @@ -271,17 +272,26 @@ public ByteBuffer asNIO() {

@Override
public int getReferenceCount() {
return proxied.refCnt();
return referenceCount.get();
}

@Override
public ByteBuf retain() {
if (referenceCount.incrementAndGet() == 1) {
referenceCount.decrementAndGet();
throw new IllegalStateException("Attempted to increment the reference count when it is already 0");
}
proxied.retain();
return this;
}

@Override
public void release() {
int newRefCount = referenceCount.decrementAndGet();
if (newRefCount < 0) {
referenceCount.incrementAndGet();
throw new IllegalStateException("Attempted to decrement the reference count below 0");
}
proxied.release();
}
}
Loading