diff --git a/NOTICE b/NOTICE index 6fd09f3c5..25eac9b38 100644 --- a/NOTICE +++ b/NOTICE @@ -11,6 +11,7 @@ Notice - https://github.com/databricks/databricks-sdk-java/blob/main/NOTICE apache/arrow - https://github.com/apache/arrow/tree/main Copyright 2016-2025 The Apache Software Foundation +*This software contains code modified by Databricks, Inc.* Notice - https://github.com/apache/arrow/blob/main/NOTICE.txt diffplug/spotless - https://github.com/diffplug/spotless/tree/main diff --git a/pom.xml b/pom.xml index de74d9468..d152b771a 100644 --- a/pom.xml +++ b/pom.xml @@ -42,7 +42,7 @@ UTF-8 UTF-8 - 17.0.0 + 18.3.0 3.18.0 11 11 diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/AbstractArrowResultChunk.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/AbstractArrowResultChunk.java index 93f366549..036c02b0b 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/AbstractArrowResultChunk.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/AbstractArrowResultChunk.java @@ -23,7 +23,6 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamReader; @@ -99,7 +98,7 @@ protected AbstractArrowResultChunk( this.rowOffset = rowOffset; this.chunkIndex = chunkIndex; this.statementId = statementId; - this.rootAllocator = new RootAllocator(Integer.MAX_VALUE); + this.rootAllocator = ArrowBufferAllocator.getBufferAllocator(); this.chunkReadyFuture = new CompletableFuture<>(); this.chunkLink = chunkLink; this.expiryTime = expiryTime; diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowBufferAllocator.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowBufferAllocator.java new file mode 100644 index 000000000..5875e3ee8 --- /dev/null +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowBufferAllocator.java @@ -0,0 +1,56 @@ +package com.databricks.jdbc.api.impl.arrow; + +import com.databricks.jdbc.log.JdbcLogger; +import com.databricks.jdbc.log.JdbcLoggerFactory; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.DatabricksBufferAllocator; +import org.apache.arrow.memory.RootAllocator; + +/** + * Creates {@link BufferAllocator} instances. + * + *

First tries to create a {@link RootAllocator} which uses off-heap memory and is faster. If + * that fails (usually due to JVM reflection restrictions), falls back to {@link + * DatabricksBufferAllocator} which uses heap memory. + */ +public class ArrowBufferAllocator { + /** Can a {@code RootAllocator} be created in this JVM instance? */ + private static final boolean canCreateRootAllocator; + + /** Logger instance. */ + private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(ArrowBufferAllocator.class); + + /* Check if the RootAllocator can be instantiated. */ + static { + RootAllocator rootAllocator = null; + try { + rootAllocator = new RootAllocator(); + } catch (Throwable t) { + LOGGER.info( + "Failed to create RootAllocator, will use DatabricksBufferAllocator as fallback: " + + t.getMessage()); + } + + canCreateRootAllocator = rootAllocator != null; + if (rootAllocator != null) { + try { + rootAllocator.close(); + } catch (Throwable t) { + LOGGER.warn("RootAllocator could not be closed: " + t.getMessage()); + } + } + } + + /** + * @return an instance of the {@code BufferAllocator}. + */ + public static BufferAllocator getBufferAllocator() { + // TODO reuse RootAllocators? + // TODO should this method be non-static. + if (canCreateRootAllocator) { + return new RootAllocator(); + } else { + return new DatabricksBufferAllocator(); + } + } +} diff --git a/src/main/java/org/apache/arrow/memory/ArrowBuf.java b/src/main/java/org/apache/arrow/memory/ArrowBuf.java new file mode 100644 index 000000000..872bbdf10 --- /dev/null +++ b/src/main/java/org/apache/arrow/memory/ArrowBuf.java @@ -0,0 +1,1265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// ------------------------------------------------------------------------- +// MODIFICATION NOTICE: +// This file was modified by Databricks, Inc on 16-December-2025. +// Description of changes: +// - Patched ArrowBuf to be non-final and extensible. +// - Patched ArrowBuf to remove dependency on BaseAllocator. +// ------------------------------------------------------------------------- + +package org.apache.arrow.memory; + +import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.ReadOnlyBufferException; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.arrow.memory.BaseAllocator.Verbosity; +import org.apache.arrow.memory.util.CommonUtil; +import org.apache.arrow.memory.util.HistoricalLog; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.util.VisibleForTesting; + +/** + * ArrowBuf serves as a facade over underlying memory by providing several access APIs to read/write + * data into a chunk of direct memory. All the accounting, ownership and reference management is + * done by {@link ReferenceManager} and ArrowBuf can work with a custom user provided implementation + * of ReferenceManager + * + *

Two important instance variables of an ArrowBuf: (1) address - starting virtual address in the + * underlying memory chunk that this ArrowBuf has access to (2) length - length (in bytes) in the + * underlying memory chunk that this ArrowBuf has access to + * + *

The management (allocation, deallocation, reference counting etc) for the memory chunk is not + * done by ArrowBuf. Default implementation of ReferenceManager, allocation is in {@link + * BaseAllocator}, {@link BufferLedger} and {@link AllocationManager} + */ +public class ArrowBuf implements AutoCloseable { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ArrowBuf.class); + + // ---- Databricks patch start ---- + // ---- Copied verbatim from BaseAllocator. We avoid initializing static fields of BaseAllocator + // ---- to avoid unsafe allocation initialization errors. + public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator"; + public static final int DEBUG_LOG_LENGTH = 6; + public static final boolean DEBUG; + + // Initialize this before DEFAULT_CONFIG as DEFAULT_CONFIG will eventually initialize the + // allocation manager, + // which in turn allocates an ArrowBuf, which requires DEBUG to have been properly initialized + static { + // the system property takes precedence. + String propValue = System.getProperty(DEBUG_ALLOCATOR); + if (propValue != null) { + DEBUG = Boolean.parseBoolean(propValue); + } else { + DEBUG = false; + } + logger.info( + "Debug mode " + + (DEBUG + ? "enabled." + : "disabled. Enable with the VM option -Darrow.memory.debug.allocator=true.")); + } + + // ---- Databricks patch end ---- + + private static final int SHORT_SIZE = Short.BYTES; + private static final int INT_SIZE = Integer.BYTES; + private static final int FLOAT_SIZE = Float.BYTES; + private static final int DOUBLE_SIZE = Double.BYTES; + private static final int LONG_SIZE = Long.BYTES; + + private static final AtomicLong idGenerator = new AtomicLong(0); + private static final int LOG_BYTES_PER_ROW = 10; + private final long id = idGenerator.incrementAndGet(); + private final ReferenceManager referenceManager; + private final BufferManager bufferManager; + private final long addr; + private long readerIndex; + private long writerIndex; + + // ---- Databricks patch start ---- + private final HistoricalLog historicalLog = + DEBUG ? new HistoricalLog(DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id) : null; + // ---- Databricks patch end ---- + + private volatile long capacity; + + /** + * Constructs a new ArrowBuf. + * + * @param referenceManager The memory manager to track memory usage and reference count of this + * buffer + * @param capacity The capacity in bytes of this buffer + */ + public ArrowBuf( + final ReferenceManager referenceManager, + final BufferManager bufferManager, + final long capacity, + final long memoryAddress) { + this.referenceManager = referenceManager; + this.bufferManager = bufferManager; + this.addr = memoryAddress; + this.capacity = capacity; + this.readerIndex = 0; + this.writerIndex = 0; + if (historicalLog != null) { + historicalLog.recordEvent("create()"); + } + } + + public int refCnt() { + return referenceManager.getRefCount(); + } + + /** + * Allows a function to determine whether not reading a particular string of bytes is valid. + * + *

Will throw an exception if the memory is not readable for some reason. Only doesn't + * something in the case that AssertionUtil.BOUNDS_CHECKING_ENABLED is true. + * + * @param start The starting position of the bytes to be read. + * @param end The exclusive endpoint of the bytes to be read. + */ + public void checkBytes(long start, long end) { + if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { + checkIndexD(start, end - start); + } + } + + /** For get/set operations, reference count should be >= 1. */ + private void ensureAccessible() { + if (this.refCnt() == 0) { + throw new IllegalStateException("Ref count should be >= 1 for accessing the ArrowBuf"); + } + } + + /** + * Get reference manager for this ArrowBuf. + * + * @return user provided implementation of {@link ReferenceManager} + */ + public ReferenceManager getReferenceManager() { + return referenceManager; + } + + public long capacity() { + return capacity; + } + + /** + * Adjusts the capacity of this buffer. Size increases are NOT supported. + * + * @param newCapacity Must be in in the range [0, length). + */ + public synchronized ArrowBuf capacity(long newCapacity) { + + if (newCapacity == capacity) { + return this; + } + + Preconditions.checkArgument(newCapacity >= 0); + + if (newCapacity < capacity) { + capacity = newCapacity; + return this; + } + + throw new UnsupportedOperationException( + "Buffers don't support resizing that increases the size."); + } + + /** Returns the byte order of elements in this buffer. */ + public ByteOrder order() { + return ByteOrder.nativeOrder(); + } + + /** Returns the number of bytes still available to read in this buffer. */ + public long readableBytes() { + Preconditions.checkState( + writerIndex >= readerIndex, "Writer index cannot be less than reader index"); + return writerIndex - readerIndex; + } + + /** + * Returns the number of bytes still available to write into this buffer before capacity is + * reached. + */ + public long writableBytes() { + return capacity() - writerIndex; + } + + /** Returns a slice of only the readable bytes in the buffer. */ + public ArrowBuf slice() { + return slice(readerIndex, readableBytes()); + } + + /** Returns a slice (view) starting at index with the given length. */ + public ArrowBuf slice(long index, long length) { + + Preconditions.checkPositionIndex(index, this.capacity); + Preconditions.checkPositionIndex(index + length, this.capacity); + + /* + * Re the behavior of reference counting, see http://netty.io/wiki/reference-counted-objects + * .html#wiki-h3-5, which + * explains that derived buffers share their reference count with their parent + */ + final ArrowBuf newBuf = referenceManager.deriveBuffer(this, index, length); + newBuf.writerIndex(length); + return newBuf; + } + + /** Make a nio byte buffer from this arrowbuf. */ + public ByteBuffer nioBuffer() { + return nioBuffer(readerIndex, checkedCastToInt(readableBytes())); + } + + /** Make a nio byte buffer from this ArrowBuf. */ + public ByteBuffer nioBuffer(long index, int length) { + chk(index, length); + return getDirectBuffer(index, length); + } + + private ByteBuffer getDirectBuffer(long index, int length) { + long address = addr(index); + return MemoryUtil.directBuffer(address, length); + } + + public long memoryAddress() { + return this.addr; + } + + @Override + public String toString() { + return String.format("ArrowBuf[%d], address:%d, capacity:%d", id, memoryAddress(), capacity); + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override + public boolean equals(Object obj) { + // identity equals only. + return this == obj; + } + + /* + * IMPORTANT NOTE + * The data getters and setters work with a caller provided + * index. This index is 0 based and since ArrowBuf has access + * to a portion of underlying chunk of memory starting at + * some address, we convert the given relative index into + * absolute index as memory address + index. + * + * Example: + * + * Let's say we have an underlying chunk of memory of length 64 bytes + * Now let's say we have an ArrowBuf that has access to the chunk + * from offset 4 for length of 16 bytes. + * + * If the starting virtual address of chunk is MAR, then memory + * address of this ArrowBuf is MAR + offset -- this is what is stored + * in variable addr. See the BufferLedger and AllocationManager code + * for the implementation of ReferenceManager that manages a + * chunk of memory and creates ArrowBuf with access to a range of + * bytes within the chunk (or the entire chunk) + * + * So now to get/set data, we will do => addr + index + * This logic is put in method addr(index) and is frequently + * used in get/set data methods to compute the absolute + * byte address for get/set operation in the underlying chunk + * + * @param index the index at which we the user wants to read/write + * @return the absolute address within the memory + */ + private long addr(long index) { + return addr + index; + } + + /*-------------------------------------------------* + | Following are a set of fast path data set and | + | get APIs to write/read data from ArrowBuf | + | at a given index (0 based relative to this | + | ArrowBuf and not relative to the underlying | + | memory chunk). | + | | + *-------------------------------------------------*/ + + /** + * Helper function to do bounds checking at a particular index for particular length of data. + * + * @param index index (0 based relative to this ArrowBuf) + * @param length provided length of data for get/set + */ + private void chk(long index, long length) { + if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { + checkIndexD(index, length); + } + } + + private void checkIndexD(long index, long fieldLength) { + // check reference count + ensureAccessible(); + // check bounds + Preconditions.checkArgument(fieldLength >= 0, "expecting non-negative data length"); + if (index < 0 || index > capacity() - fieldLength) { + if (historicalLog != null) { + historicalLog.logHistory(logger); + } + throw new IndexOutOfBoundsException( + String.format( + "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity())); + } + } + + /** + * Get long value stored at a particular index in the underlying memory chunk this ArrowBuf has + * access to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be read from + * @return 8 byte long value + */ + public long getLong(long index) { + chk(index, LONG_SIZE); + return MemoryUtil.getLong(addr(index)); + } + + /** + * Set long value at a particular index in the underlying memory chunk this ArrowBuf has access + * to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be written + * @param value value to write + */ + public void setLong(long index, long value) { + chk(index, LONG_SIZE); + MemoryUtil.putLong(addr(index), value); + } + + /** + * Get float value stored at a particular index in the underlying memory chunk this ArrowBuf has + * access to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be read from + * @return 4 byte float value + */ + public float getFloat(long index) { + return Float.intBitsToFloat(getInt(index)); + } + + /** + * Set float value at a particular index in the underlying memory chunk this ArrowBuf has access + * to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be written + * @param value value to write + */ + public void setFloat(long index, float value) { + chk(index, FLOAT_SIZE); + MemoryUtil.putInt(addr(index), Float.floatToRawIntBits(value)); + } + + /** + * Get double value stored at a particular index in the underlying memory chunk this ArrowBuf has + * access to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be read from + * @return 8 byte double value + */ + public double getDouble(long index) { + return Double.longBitsToDouble(getLong(index)); + } + + /** + * Set double value at a particular index in the underlying memory chunk this ArrowBuf has access + * to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be written + * @param value value to write + */ + public void setDouble(long index, double value) { + chk(index, DOUBLE_SIZE); + MemoryUtil.putLong(addr(index), Double.doubleToRawLongBits(value)); + } + + /** + * Get char value stored at a particular index in the underlying memory chunk this ArrowBuf has + * access to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be read from + * @return 2 byte char value + */ + public char getChar(long index) { + return (char) getShort(index); + } + + /** + * Set char value at a particular index in the underlying memory chunk this ArrowBuf has access + * to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be written + * @param value value to write + */ + public void setChar(long index, int value) { + chk(index, SHORT_SIZE); + MemoryUtil.putShort(addr(index), (short) value); + } + + /** + * Get int value stored at a particular index in the underlying memory chunk this ArrowBuf has + * access to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be read from + * @return 4 byte int value + */ + public int getInt(long index) { + chk(index, INT_SIZE); + return MemoryUtil.getInt(addr(index)); + } + + /** + * Set int value at a particular index in the underlying memory chunk this ArrowBuf has access to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be written + * @param value value to write + */ + public void setInt(long index, int value) { + chk(index, INT_SIZE); + MemoryUtil.putInt(addr(index), value); + } + + /** + * Get short value stored at a particular index in the underlying memory chunk this ArrowBuf has + * access to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be read from + * @return 2 byte short value + */ + public short getShort(long index) { + chk(index, SHORT_SIZE); + return MemoryUtil.getShort(addr(index)); + } + + /** + * Set short value at a particular index in the underlying memory chunk this ArrowBuf has access + * to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be written + * @param value value to write + */ + public void setShort(long index, int value) { + setShort(index, (short) value); + } + + /** + * Set short value at a particular index in the underlying memory chunk this ArrowBuf has access + * to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be written + * @param value value to write + */ + public void setShort(long index, short value) { + chk(index, SHORT_SIZE); + MemoryUtil.putShort(addr(index), value); + } + + /** + * Set byte value at a particular index in the underlying memory chunk this ArrowBuf has access + * to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be written + * @param value value to write + */ + public void setByte(long index, int value) { + chk(index, 1); + MemoryUtil.putByte(addr(index), (byte) value); + } + + /** + * Set byte value at a particular index in the underlying memory chunk this ArrowBuf has access + * to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be written + * @param value value to write + */ + public void setByte(long index, byte value) { + chk(index, 1); + MemoryUtil.putByte(addr(index), value); + } + + /** + * Get byte value stored at a particular index in the underlying memory chunk this ArrowBuf has + * access to. + * + * @param index index (0 based relative to this ArrowBuf) where the value will be read from + * @return byte value + */ + public byte getByte(long index) { + chk(index, 1); + return MemoryUtil.getByte(addr(index)); + } + + /*--------------------------------------------------* + | Following are another set of data set APIs | + | that directly work with writerIndex | + | | + *--------------------------------------------------*/ + + /** + * Helper function to do bound checking w.r.t writerIndex by checking if we can set "length" bytes + * of data at the writerIndex in this ArrowBuf. + * + * @param length provided length of data for set + */ + private void ensureWritable(final int length) { + if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { + Preconditions.checkArgument(length >= 0, "expecting non-negative length"); + // check reference count + this.ensureAccessible(); + // check bounds + if (length > writableBytes()) { + throw new IndexOutOfBoundsException( + String.format( + "writerIndex(%d) + length(%d) exceeds capacity(%d)", + writerIndex, length, capacity())); + } + } + } + + /** + * Helper function to do bound checking w.r.t readerIndex by checking if we can read "length" + * bytes of data at the readerIndex in this ArrowBuf. + * + * @param length provided length of data for get + */ + private void ensureReadable(final int length) { + if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { + Preconditions.checkArgument(length >= 0, "expecting non-negative length"); + // check reference count + this.ensureAccessible(); + // check bounds + if (length > readableBytes()) { + throw new IndexOutOfBoundsException( + String.format( + "readerIndex(%d) + length(%d) exceeds writerIndex(%d)", + readerIndex, length, writerIndex)); + } + } + } + + /** + * Read the byte at readerIndex. + * + * @return byte value + */ + public byte readByte() { + ensureReadable(1); + final byte b = getByte(readerIndex); + ++readerIndex; + return b; + } + + /** + * Read dst.length bytes at readerIndex into dst byte array. + * + * @param dst byte array where the data will be written + */ + public void readBytes(byte[] dst) { + Preconditions.checkArgument(dst != null, "expecting valid dst bytearray"); + ensureReadable(dst.length); + getBytes(readerIndex, dst, 0, checkedCastToInt(dst.length)); + } + + /** + * Set the provided byte value at the writerIndex. + * + * @param value value to set + */ + public void writeByte(byte value) { + ensureWritable(1); + MemoryUtil.putByte(addr(writerIndex), value); + ++writerIndex; + } + + /** + * Set the lower order byte for the provided value at the writerIndex. + * + * @param value value to be set + */ + public void writeByte(int value) { + ensureWritable(1); + MemoryUtil.putByte(addr(writerIndex), (byte) value); + ++writerIndex; + } + + /** + * Write the bytes from given byte array into this ArrowBuf starting at writerIndex. + * + * @param src src byte array + */ + public void writeBytes(byte[] src) { + Preconditions.checkArgument(src != null, "expecting valid src array"); + writeBytes(src, 0, src.length); + } + + /** + * Write the bytes from given byte array starting at srcIndex into this ArrowBuf starting at + * writerIndex. + * + * @param src src byte array + * @param srcIndex index in the byte array where the copy will being from + * @param length length of data to copy + */ + public void writeBytes(byte[] src, int srcIndex, int length) { + ensureWritable(length); + setBytes(writerIndex, src, srcIndex, length); + writerIndex += length; + } + + /** + * Set the provided int value as short at the writerIndex. + * + * @param value value to set + */ + public void writeShort(int value) { + ensureWritable(SHORT_SIZE); + MemoryUtil.putShort(addr(writerIndex), (short) value); + writerIndex += SHORT_SIZE; + } + + /** + * Set the provided int value at the writerIndex. + * + * @param value value to set + */ + public void writeInt(int value) { + ensureWritable(INT_SIZE); + MemoryUtil.putInt(addr(writerIndex), value); + writerIndex += INT_SIZE; + } + + /** + * Set the provided long value at the writerIndex. + * + * @param value value to set + */ + public void writeLong(long value) { + ensureWritable(LONG_SIZE); + MemoryUtil.putLong(addr(writerIndex), value); + writerIndex += LONG_SIZE; + } + + /** + * Set the provided float value at the writerIndex. + * + * @param value value to set + */ + public void writeFloat(float value) { + ensureWritable(FLOAT_SIZE); + MemoryUtil.putInt(addr(writerIndex), Float.floatToRawIntBits(value)); + writerIndex += FLOAT_SIZE; + } + + /** + * Set the provided double value at the writerIndex. + * + * @param value value to set + */ + public void writeDouble(double value) { + ensureWritable(DOUBLE_SIZE); + MemoryUtil.putLong(addr(writerIndex), Double.doubleToRawLongBits(value)); + writerIndex += DOUBLE_SIZE; + } + + /*--------------------------------------------------* + | Following are another set of data set/get APIs | + | that read and write stream of bytes from/to byte | + | arrays, ByteBuffer, ArrowBuf etc | + | | + *--------------------------------------------------*/ + + /** + * Determine if the requested {@code index} and {@code length} will fit within {@code capacity}. + * + * @param index The starting index. + * @param length The length which will be utilized (starting from {@code index}). + * @param capacity The capacity that {@code index + length} is allowed to be within. + * @return {@code true} if the requested {@code index} and {@code length} will fit within {@code + * capacity}. {@code false} if this would result in an index out of bounds exception. + */ + private static boolean isOutOfBounds(long index, long length, long capacity) { + return (index | length | (index + length) | (capacity - (index + length))) < 0; + } + + private void checkIndex(long index, long fieldLength) { + if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { + // check reference count + this.ensureAccessible(); + // check bounds + if (isOutOfBounds(index, fieldLength, this.capacity())) { + throw new IndexOutOfBoundsException( + String.format( + "index: %d, length: %d (expected: range(0, %d))", + index, fieldLength, this.capacity())); + } + } + } + + /** + * Copy data from this ArrowBuf at a given index in into destination byte array. + * + * @param index starting index (0 based relative to the portion of memory) this ArrowBuf has + * access to + * @param dst byte array to copy the data into + */ + public void getBytes(long index, byte[] dst) { + getBytes(index, dst, 0, dst.length); + } + + /** + * Copy data from this ArrowBuf at a given index into destination byte array. + * + * @param index index (0 based relative to the portion of memory this ArrowBuf has access to) + * @param dst byte array to copy the data into + * @param dstIndex starting index in dst byte array to copy into + * @param length length of data to copy from this ArrowBuf + */ + public void getBytes(long index, byte[] dst, int dstIndex, int length) { + // bound check for this ArrowBuf where the data will be copied from + checkIndex(index, length); + // null check + Preconditions.checkArgument(dst != null, "expecting a valid dst byte array"); + // bound check for dst byte array where the data will be copied to + if (isOutOfBounds(dstIndex, length, dst.length)) { + // not enough space to copy "length" bytes into dst array from dstIndex onwards + throw new IndexOutOfBoundsException( + "Not enough space to copy data into destination" + dstIndex); + } + if (length != 0) { + // copy "length" bytes from this ArrowBuf starting at addr(index) address + // into dst byte array at dstIndex onwards + MemoryUtil.copyFromMemory(addr(index), dst, dstIndex, length); + } + } + + /** + * Copy data from a given byte array into this ArrowBuf starting at a given index. + * + * @param index starting index (0 based relative to the portion of memory) this ArrowBuf has + * access to + * @param src byte array to copy the data from + */ + public void setBytes(long index, byte[] src) { + setBytes(index, src, 0, src.length); + } + + /** + * Copy data from a given byte array starting at the given source index into this ArrowBuf at a + * given index. + * + * @param index index (0 based relative to the portion of memory this ArrowBuf has access to) + * @param src src byte array to copy the data from + * @param srcIndex index in the byte array where the copy will start from + * @param length length of data to copy from byte array + */ + public void setBytes(long index, byte[] src, int srcIndex, long length) { + // bound check for this ArrowBuf where the data will be copied into + checkIndex(index, length); + // null check + Preconditions.checkArgument(src != null, "expecting a valid src byte array"); + // bound check for src byte array where the data will be copied from + if (isOutOfBounds(srcIndex, length, src.length)) { + // not enough space to copy "length" bytes into dst array from dstIndex onwards + throw new IndexOutOfBoundsException( + "Not enough space to copy data from byte array" + srcIndex); + } + if (length > 0) { + // copy "length" bytes from src byte array at the starting index (srcIndex) + // into this ArrowBuf starting at address "addr(index)" + MemoryUtil.copyToMemory(src, srcIndex, addr(index), length); + } + } + + /** + * Copy data from this ArrowBuf at a given index into the destination ByteBuffer. + * + * @param index index (0 based relative to the portion of memory this ArrowBuf has access to) + * @param dst dst ByteBuffer where the data will be copied into + */ + public void getBytes(long index, ByteBuffer dst) { + // bound check for this ArrowBuf where the data will be copied from + checkIndex(index, dst.remaining()); + // dst.remaining() bytes of data will be copied into dst ByteBuffer + if (dst.remaining() != 0) { + // address in this ArrowBuf where the copy will begin from + final long srcAddress = addr(index); + if (dst.isDirect()) { + if (dst.isReadOnly()) { + throw new ReadOnlyBufferException(); + } + // copy dst.remaining() bytes of data from this ArrowBuf starting + // at address srcAddress into the dst ByteBuffer starting at + // address dstAddress + final long dstAddress = MemoryUtil.getByteBufferAddress(dst) + dst.position(); + MemoryUtil.copyMemory(srcAddress, dstAddress, dst.remaining()); + // after copy, bump the next write position for the dst ByteBuffer + dst.position(dst.position() + dst.remaining()); + } else if (dst.hasArray()) { + // copy dst.remaining() bytes of data from this ArrowBuf starting + // at address srcAddress into the dst ByteBuffer starting at + // index dstIndex + final int dstIndex = dst.arrayOffset() + dst.position(); + MemoryUtil.copyFromMemory(srcAddress, dst.array(), dstIndex, dst.remaining()); + // after copy, bump the next write position for the dst ByteBuffer + dst.position(dst.position() + dst.remaining()); + } else { + throw new UnsupportedOperationException( + "Copy from this ArrowBuf to ByteBuffer is not supported"); + } + } + } + + /** + * Copy data into this ArrowBuf at a given index onwards from a source ByteBuffer. + * + * @param index index index (0 based relative to the portion of memory this ArrowBuf has access + * to) + * @param src src ByteBuffer where the data will be copied from + */ + public void setBytes(long index, ByteBuffer src) { + // bound check for this ArrowBuf where the data will be copied into + checkIndex(index, src.remaining()); + // length of data to copy + int length = src.remaining(); + // address in this ArrowBuf where the data will be copied to + long dstAddress = addr(index); + if (length != 0) { + if (src.isDirect()) { + // copy src.remaining() bytes of data from src ByteBuffer starting at + // address srcAddress into this ArrowBuf starting at address dstAddress + final long srcAddress = MemoryUtil.getByteBufferAddress(src) + src.position(); + MemoryUtil.copyMemory(srcAddress, dstAddress, length); + // after copy, bump the next read position for the src ByteBuffer + src.position(src.position() + length); + } else if (src.hasArray()) { + // copy src.remaining() bytes of data from src ByteBuffer starting at + // index srcIndex into this ArrowBuf starting at address dstAddress + final int srcIndex = src.arrayOffset() + src.position(); + MemoryUtil.copyToMemory(src.array(), srcIndex, dstAddress, length); + // after copy, bump the next read position for the src ByteBuffer + src.position(src.position() + length); + } else { + final ByteOrder originalByteOrder = src.order(); + src.order(order()); + try { + // copy word at a time + while (length - 128 >= LONG_SIZE) { + for (int x = 0; x < 16; x++) { + MemoryUtil.putLong(dstAddress, src.getLong()); + length -= LONG_SIZE; + dstAddress += LONG_SIZE; + } + } + while (length >= LONG_SIZE) { + MemoryUtil.putLong(dstAddress, src.getLong()); + length -= LONG_SIZE; + dstAddress += LONG_SIZE; + } + // copy last byte + while (length > 0) { + MemoryUtil.putByte(dstAddress, src.get()); + --length; + ++dstAddress; + } + } finally { + src.order(originalByteOrder); + } + } + } + } + + /** + * Copy data into this ArrowBuf at a given index onwards from a source ByteBuffer starting at a + * given srcIndex for a certain length. + * + * @param index index (0 based relative to the portion of memory this ArrowBuf has access to) + * @param src src ByteBuffer where the data will be copied from + * @param srcIndex starting index in the src ByteBuffer where the data copy will start from + * @param length length of data to copy from src ByteBuffer + */ + public void setBytes(long index, ByteBuffer src, int srcIndex, int length) { + // bound check for this ArrowBuf where the data will be copied into + checkIndex(index, length); + if (src.isDirect()) { + // copy length bytes of data from src ByteBuffer starting at address + // srcAddress into this ArrowBuf at address dstAddress + final long srcAddress = MemoryUtil.getByteBufferAddress(src) + srcIndex; + final long dstAddress = addr(index); + MemoryUtil.copyMemory(srcAddress, dstAddress, length); + } else { + if (srcIndex == 0 && src.capacity() == length) { + // copy the entire ByteBuffer from start to end of length + setBytes(index, src); + } else { + ByteBuffer newBuf = src.duplicate(); + newBuf.position(srcIndex); + newBuf.limit(srcIndex + length); + setBytes(index, newBuf); + } + } + } + + /** + * Copy a given length of data from this ArrowBuf starting at a given index into a dst ArrowBuf at + * dstIndex. + * + * @param index index (0 based relative to the portion of memory this ArrowBuf has access to) + * @param dst dst ArrowBuf where the data will be copied into + * @param dstIndex index (0 based relative to the portion of memory dst ArrowBuf has access to) + * @param length length of data to copy + */ + public void getBytes(long index, ArrowBuf dst, long dstIndex, int length) { + // bound check for this ArrowBuf where the data will be copied from + checkIndex(index, length); + // bound check for this ArrowBuf where the data will be copied into + Preconditions.checkArgument(dst != null, "expecting a valid ArrowBuf"); + // bound check for dst ArrowBuf + if (isOutOfBounds(dstIndex, length, dst.capacity())) { + throw new IndexOutOfBoundsException( + String.format( + "index: %d, length: %d (expected: range(0, %d))", dstIndex, length, dst.capacity())); + } + if (length != 0) { + // copy length bytes of data from this ArrowBuf starting at + // address srcAddress into dst ArrowBuf starting at address + // dstAddress + final long srcAddress = addr(index); + final long dstAddress = dst.memoryAddress() + (long) dstIndex; + MemoryUtil.copyMemory(srcAddress, dstAddress, length); + } + } + + /** + * Copy data from src ArrowBuf starting at index srcIndex into this ArrowBuf at given index. + * + * @param index index index (0 based relative to the portion of memory this ArrowBuf has access + * to) + * @param src src ArrowBuf where the data will be copied from + * @param srcIndex starting index in the src ArrowBuf where the copy will begin from + * @param length length of data to copy from src ArrowBuf + */ + public void setBytes(long index, ArrowBuf src, long srcIndex, long length) { + // bound check for this ArrowBuf where the data will be copied into + checkIndex(index, length); + // null check + Preconditions.checkArgument(src != null, "expecting a valid ArrowBuf"); + // bound check for src ArrowBuf + if (isOutOfBounds(srcIndex, length, src.capacity())) { + throw new IndexOutOfBoundsException( + String.format( + "index: %d, length: %d (expected: range(0, %d))", srcIndex, length, src.capacity())); + } + if (length != 0) { + // copy length bytes of data from src ArrowBuf starting at + // address srcAddress into this ArrowBuf starting at address + // dstAddress + final long srcAddress = src.memoryAddress() + srcIndex; + final long dstAddress = addr(index); + MemoryUtil.copyMemory(srcAddress, dstAddress, length); + } + } + + /** + * Copy readableBytes() number of bytes from src ArrowBuf starting from its readerIndex into this + * ArrowBuf starting at the given index. + * + * @param index index index (0 based relative to the portion of memory this ArrowBuf has access + * to) + * @param src src ArrowBuf where the data will be copied from + */ + public void setBytes(long index, ArrowBuf src) { + // null check + Preconditions.checkArgument(src != null, "expecting valid ArrowBuf"); + final long length = src.readableBytes(); + // bound check for this ArrowBuf where the data will be copied into + checkIndex(index, length); + final long srcAddress = src.memoryAddress() + src.readerIndex; + final long dstAddress = addr(index); + MemoryUtil.copyMemory(srcAddress, dstAddress, length); + src.readerIndex(src.readerIndex + length); + } + + /** + * Copy a certain length of bytes from given InputStream into this ArrowBuf at the provided index. + * + * @param index index index (0 based relative to the portion of memory this ArrowBuf has access + * to) + * @param in src stream to copy from + * @param length length of data to copy + * @return number of bytes copied from stream into ArrowBuf + * @throws IOException on failing to read from stream + */ + public int setBytes(long index, InputStream in, int length) throws IOException { + Preconditions.checkArgument(in != null, "expecting valid input stream"); + checkIndex(index, length); + int readBytes = 0; + if (length > 0) { + byte[] tmp = new byte[length]; + // read the data from input stream into tmp byte array + readBytes = in.read(tmp); + if (readBytes > 0) { + // copy readBytes length of data from the tmp byte array starting + // at srcIndex 0 into this ArrowBuf starting at address addr(index) + MemoryUtil.copyToMemory(tmp, 0, addr(index), readBytes); + } + } + return readBytes; + } + + /** + * Copy a certain length of bytes from this ArrowBuf at a given index into the given OutputStream. + * + * @param index index index (0 based relative to the portion of memory this ArrowBuf has access + * to) + * @param out dst stream to copy data into + * @param length length of data to copy + * @throws IOException on failing to write to stream + */ + public void getBytes(long index, OutputStream out, int length) throws IOException { + Preconditions.checkArgument(out != null, "expecting valid output stream"); + checkIndex(index, length); + if (length > 0) { + // copy length bytes of data from this ArrowBuf starting at + // address addr(index) into the tmp byte array starting at index 0 + byte[] tmp = new byte[length]; + MemoryUtil.copyFromMemory(addr(index), tmp, 0, length); + // write the copied data to output stream + out.write(tmp); + } + } + + @Override + public void close() { + referenceManager.release(); + } + + /** + * Returns the possible memory consumed by this ArrowBuf in the worse case scenario. (not shared, + * connected to larger underlying buffer of allocated memory) + * + * @return Size in bytes. + */ + public long getPossibleMemoryConsumed() { + return referenceManager.getSize(); + } + + /** + * Return that is Accounted for by this buffer (and its potentially shared siblings within the + * context of the associated allocator). + * + * @return Size in bytes. + */ + public long getActualMemoryConsumed() { + return referenceManager.getAccountedSize(); + } + + /** + * Return the buffer's byte contents in the form of a hex dump. + * + * @param start the starting byte index + * @param length how many bytes to log + * @return A hex dump in a String. + */ + public String toHexString(final long start, final int length) { + final long roundedStart = (start / LOG_BYTES_PER_ROW) * LOG_BYTES_PER_ROW; + + final StringBuilder sb = new StringBuilder("buffer byte dump\n"); + long index = roundedStart; + for (long nLogged = 0; nLogged < length; nLogged += LOG_BYTES_PER_ROW) { + sb.append(String.format(" [%05d-%05d]", index, index + LOG_BYTES_PER_ROW - 1)); + for (int i = 0; i < LOG_BYTES_PER_ROW; ++i) { + try { + final byte b = getByte(index++); + sb.append(String.format(" 0x%02x", b)); + } catch (IndexOutOfBoundsException ioob) { + sb.append(" "); + } + } + sb.append('\n'); + } + return sb.toString(); + } + + /** + * Get the integer id assigned to this ArrowBuf for debugging purposes. + * + * @return integer id + */ + public long getId() { + return id; + } + + /** + * Print information of this buffer into sb at the given indentation and verbosity + * level. + * + *

It will include history if BaseAllocator.DEBUG is true and the + * verbosity.includeHistoricalLog are true. + */ + @VisibleForTesting + public void print(StringBuilder sb, int indent, Verbosity verbosity) { + CommonUtil.indent(sb, indent).append(toString()); + + if (historicalLog != null && verbosity.includeHistoricalLog) { + sb.append("\n"); + historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces); + } + } + + /** + * Print detailed information of this buffer into sb. + * + *

Most information will only be present if BaseAllocator.DEBUG is true. + */ + public void print(StringBuilder sb, int indent) { + print(sb, indent, Verbosity.LOG_WITH_STACKTRACE); + } + + /** + * Get the index at which the next byte will be read from. + * + * @return reader index + */ + public long readerIndex() { + return readerIndex; + } + + /** + * Get the index at which next byte will be written to. + * + * @return writer index + */ + public long writerIndex() { + return writerIndex; + } + + /** + * Set the reader index for this ArrowBuf. + * + * @param readerIndex new reader index + * @return this ArrowBuf + */ + public ArrowBuf readerIndex(long readerIndex) { + this.readerIndex = readerIndex; + return this; + } + + /** + * Set the writer index for this ArrowBuf. + * + * @param writerIndex new writer index + * @return this ArrowBuf + */ + public ArrowBuf writerIndex(long writerIndex) { + this.writerIndex = writerIndex; + return this; + } + + /** + * Zero-out the bytes in this ArrowBuf starting at the given index for the given length. + * + * @param index index index (0 based relative to the portion of memory this ArrowBuf has access + * to) + * @param length length of bytes to zero-out + * @return this ArrowBuf + */ + public ArrowBuf setZero(long index, long length) { + if (length != 0) { + this.checkIndex(index, length); + MemoryUtil.setMemory(this.addr + index, length, (byte) 0); + } + return this; + } + + /** + * Sets all bits to one in the specified range. + * + * @param index index index (0 based relative to the portion of memory this ArrowBuf has access + * to) + * @param length length of bytes to set. + * @return this ArrowBuf + * @deprecated use {@link ArrowBuf#setOne(long, long)} instead. + */ + @Deprecated + public ArrowBuf setOne(int index, int length) { + if (length != 0) { + this.checkIndex(index, length); + MemoryUtil.setMemory(this.addr + index, length, (byte) 0xff); + } + return this; + } + + /** + * Sets all bits to one in the specified range. + * + * @param index index index (0 based relative to the portion of memory this ArrowBuf has access + * to) + * @param length length of bytes to set. + * @return this ArrowBuf + */ + public ArrowBuf setOne(long index, long length) { + if (length != 0) { + this.checkIndex(index, length); + MemoryUtil.setMemory(this.addr + index, length, (byte) 0xff); + } + return this; + } + + /** + * Returns this if size is less than {@link #capacity()}, otherwise delegates to + * {@link BufferManager#replace(ArrowBuf, long)} to get a new buffer. + */ + public ArrowBuf reallocIfNeeded(final long size) { + Preconditions.checkArgument(size >= 0, "reallocation size must be non-negative"); + if (this.capacity() >= size) { + return this; + } + if (bufferManager != null) { + return bufferManager.replace(this, size); + } else { + throw new UnsupportedOperationException( + "Realloc is only available in the context of operator's UDFs"); + } + } + + public ArrowBuf clear() { + this.readerIndex = this.writerIndex = 0; + return this; + } +} diff --git a/src/main/java/org/apache/arrow/memory/DatabricksAllocationReservation.java b/src/main/java/org/apache/arrow/memory/DatabricksAllocationReservation.java new file mode 100644 index 000000000..353d45e55 --- /dev/null +++ b/src/main/java/org/apache/arrow/memory/DatabricksAllocationReservation.java @@ -0,0 +1,104 @@ +package org.apache.arrow.memory; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** An AllocationReservation implementation for cumulative allocation requests. */ +class DatabricksAllocationReservation implements AllocationReservation { + + private final DatabricksBufferAllocator allocator; + private final AtomicLong reservedSize = new AtomicLong(0); + private final AtomicBoolean used = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + + public DatabricksAllocationReservation(DatabricksBufferAllocator allocator) { + this.allocator = allocator; + } + + @SuppressWarnings("removal") + @Override + @Deprecated + public boolean add(int nBytes) { + return add((long) nBytes); + } + + @Override + public boolean add(long nBytes) { + assertNotUsed(); + if (nBytes < 0) { + return false; + } + reservedSize.addAndGet(nBytes); + return true; + } + + @SuppressWarnings("removal") + @Override + @Deprecated + public boolean reserve(int nBytes) { + return reserve((long) nBytes); + } + + @Override + public boolean reserve(long nBytes) { + assertNotUsed(); + if (nBytes < 0) { + return false; + } + // Check if reservation would exceed limits + long currentReservation = reservedSize.get(); + long newReservation = currentReservation + nBytes; + if (newReservation > allocator.getHeadroom() + currentReservation) { + return false; + } + reservedSize.addAndGet(nBytes); + return true; + } + + @Override + public ArrowBuf allocateBuffer() { + assertNotUsed(); + if (!used.compareAndSet(false, true)) { + throw new IllegalStateException("Reservation already used"); + } + long size = reservedSize.get(); + if (size == 0) { + return allocator.getEmpty(); + } + return allocator.buffer(size); + } + + @Override + public int getSize() { + return (int) Math.min(reservedSize.get(), Integer.MAX_VALUE); + } + + @Override + public long getSizeLong() { + return reservedSize.get(); + } + + @Override + public boolean isUsed() { + return used.get(); + } + + @Override + public boolean isClosed() { + return closed.get(); + } + + @Override + public void close() { + closed.set(true); + } + + private void assertNotUsed() { + if (used.get()) { + throw new IllegalStateException("Reservation already used"); + } + if (closed.get()) { + throw new IllegalStateException("Reservation is closed"); + } + } +} diff --git a/src/main/java/org/apache/arrow/memory/DatabricksArrowBuf.java b/src/main/java/org/apache/arrow/memory/DatabricksArrowBuf.java new file mode 100644 index 000000000..bd2d1a8fa --- /dev/null +++ b/src/main/java/org/apache/arrow/memory/DatabricksArrowBuf.java @@ -0,0 +1,738 @@ +package org.apache.arrow.memory; + +import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.arrow.memory.util.CommonUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.util.VisibleForTesting; + +/** + * A ByteBuffer-backed implementation of ArrowBuf that does not use unsafe memory operations. This + * implementation uses standard java.nio.ByteBuffer for all memory operations instead of + * MemoryUtil/Unsafe-based direct memory access. + */ +public class DatabricksArrowBuf extends ArrowBuf { + + private static final int SHORT_SIZE = Short.BYTES; + private static final int INT_SIZE = Integer.BYTES; + private static final int FLOAT_SIZE = Float.BYTES; + private static final int DOUBLE_SIZE = Double.BYTES; + private static final int LONG_SIZE = Long.BYTES; + private static final int LOG_BYTES_PER_ROW = 10; + + private final ByteBuffer byteBuffer; + private final ReferenceManager referenceManager; + private final BufferManager bufferManager; + private final int offset; // offset within the underlying ByteBuffer for sliced buffers + private volatile long capacity; + private long readerIndex; + private long writerIndex; + + /** + * Memory address used to instantiate the super class {@code ArrowBuf}. Unused in this class. + */ + private static final int MEMORY_ADDRESS = 0; + + /** + * ArrowBuf uses native order, copying the same logic here. + */ + private static final ByteOrder BYTE_ORDER = ByteOrder.nativeOrder(); + + /** + * Constructs a new DatabricksArrowBuf backed by a heap ByteBuffer. + * + * @param referenceManager The memory manager to track memory usage and reference count + * @param bufferManager The buffer manager for reallocation support + * @param capacity The capacity in bytes of this buffer + */ + public DatabricksArrowBuf( + ReferenceManager referenceManager, + BufferManager bufferManager, + long capacity) { + super(referenceManager, bufferManager, capacity, MEMORY_ADDRESS); + + this.referenceManager = referenceManager; + this.bufferManager = bufferManager; + this.capacity = capacity; + this.offset = 0; + this.readerIndex = 0; + this.writerIndex = 0; + + if (capacity > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "DatabricksArrowBuf does not support capacity > Integer.MAX_VALUE"); + } + + this.byteBuffer = ByteBuffer.allocate((int) capacity); + this.byteBuffer.order(BYTE_ORDER); + } + + /** + * Constructor for creating sliced views or derived buffers that share an underlying ByteBuffer. + * + * @param referenceManager The memory manager + * @param bufferManager The buffer manager + * @param byteBuffer The underlying ByteBuffer (shared with parent) + * @param offset The offset within the ByteBuffer + * @param capacity The capacity in bytes of this buffer + */ + DatabricksArrowBuf( + ReferenceManager referenceManager, + BufferManager bufferManager, + ByteBuffer byteBuffer, + int offset, + long capacity) { + super(referenceManager, bufferManager, capacity, MEMORY_ADDRESS); + + this.referenceManager = referenceManager; + this.bufferManager = bufferManager; + this.byteBuffer = byteBuffer; + this.offset = offset; + this.capacity = capacity; + this.readerIndex = 0; + this.writerIndex = 0; + } + + @Override + public int refCnt() { + return referenceManager.getRefCount(); + } + + @Override + public void checkBytes(long start, long end) { + if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { + checkIndexD(start, end - start); + } + } + + @Override + public ReferenceManager getReferenceManager() { + return referenceManager; + } + + @Override + public long capacity() { + return capacity; + } + + @Override + public synchronized ArrowBuf capacity(long newCapacity) { + if (newCapacity == capacity) { + return this; + } + + Preconditions.checkArgument(newCapacity >= 0); + + if (newCapacity >= capacity) { + throw new UnsupportedOperationException( + "Buffers don't support resizing that increases the size."); + } + + this.capacity = newCapacity; + return this; + } + + @Override + public ByteOrder order() { + return BYTE_ORDER; + } + + @Override + public long readableBytes() { + Preconditions.checkState( + writerIndex >= readerIndex, "Writer index cannot be less than reader index"); + return writerIndex - readerIndex; + } + + @Override + public long writableBytes() { + return capacity() - writerIndex; + } + + @Override + public ArrowBuf slice() { + return slice(readerIndex, readableBytes()); + } + + @Override + public ArrowBuf slice(long index, long length) { + Preconditions.checkPositionIndex(index, this.capacity); + Preconditions.checkPositionIndex(index + length, this.capacity); + + // Delegate to reference manager's deriveBuffer to ensure consistent behavior + // with reference counting semantics (derived buffers share ref count with parent) + final ArrowBuf newBuf = referenceManager.deriveBuffer(this, index, length); + newBuf.writerIndex(length); + return newBuf; + } + + @Override + public ByteBuffer nioBuffer() { + return nioBuffer(readerIndex, checkedCastToInt(readableBytes())); + } + + @Override + public ByteBuffer nioBuffer(long index, int length) { + chk(index, length); + ByteBuffer duplicate = byteBuffer.duplicate(); + duplicate.order(BYTE_ORDER); + duplicate.position(offset + (int)index); + duplicate.limit(offset + (int)index + length); + return duplicate; + } + + @Override + public long memoryAddress() { + return MEMORY_ADDRESS; + } + + @Override + public String toString() { + return String.format("DatabricksArrowBuf capacity:%d, offset:%d", capacity, offset); + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override + public boolean equals(Object obj) { + return this == obj; + } + + private int bufferIndex(long index) { + return offset + (int) index; + } + + private void chk(long index, long length) { + if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { + checkIndexD(index, length); + } + } + + private void checkIndexD(long index, long fieldLength) { + Preconditions.checkArgument(fieldLength >= 0, "expecting non-negative data length"); + if (index < 0 || index > capacity() - fieldLength) { + throw new IndexOutOfBoundsException( + String.format( + "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity())); + } + } + + // --- Primitive get/set operations using ByteBuffer --- + + @Override + public long getLong(long index) { + chk(index, LONG_SIZE); + return byteBuffer.getLong(bufferIndex(index)); + } + + @Override + public void setLong(long index, long value) { + chk(index, LONG_SIZE); + byteBuffer.putLong(bufferIndex(index), value); + } + + @Override + public float getFloat(long index) { + chk(index, FLOAT_SIZE); + return byteBuffer.getFloat(bufferIndex(index)); + } + + @Override + public void setFloat(long index, float value) { + chk(index, FLOAT_SIZE); + byteBuffer.putFloat(bufferIndex(index), value); + } + + @Override + public double getDouble(long index) { + chk(index, DOUBLE_SIZE); + return byteBuffer.getDouble(bufferIndex(index)); + } + + @Override + public void setDouble(long index, double value) { + chk(index, DOUBLE_SIZE); + byteBuffer.putDouble(bufferIndex(index), value); + } + + @Override + public char getChar(long index) { + chk(index, SHORT_SIZE); + return byteBuffer.getChar(bufferIndex(index)); + } + + @Override + public void setChar(long index, int value) { + chk(index, SHORT_SIZE); + byteBuffer.putChar(bufferIndex(index), (char) value); + } + + @Override + public int getInt(long index) { + chk(index, INT_SIZE); + return byteBuffer.getInt(bufferIndex(index)); + } + + @Override + public void setInt(long index, int value) { + chk(index, INT_SIZE); + byteBuffer.putInt(bufferIndex(index), value); + } + + @Override + public short getShort(long index) { + chk(index, SHORT_SIZE); + return byteBuffer.getShort(bufferIndex(index)); + } + + @Override + public void setShort(long index, int value) { + setShort(index, (short) value); + } + + @Override + public void setShort(long index, short value) { + chk(index, SHORT_SIZE); + byteBuffer.putShort(bufferIndex(index), value); + } + + @Override + public void setByte(long index, int value) { + chk(index, 1); + byteBuffer.put(bufferIndex(index), (byte) value); + } + + @Override + public void setByte(long index, byte value) { + chk(index, 1); + byteBuffer.put(bufferIndex(index), value); + } + + @Override + public byte getByte(long index) { + chk(index, 1); + return byteBuffer.get(bufferIndex(index)); + } + + // --- Writer index based operations --- + + private void ensureWritable(final int length) { + if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { + Preconditions.checkArgument(length >= 0, "expecting non-negative length"); + if (length > writableBytes()) { + throw new IndexOutOfBoundsException( + String.format( + "writerIndex(%d) + length(%d) exceeds capacity(%d)", + writerIndex, length, capacity())); + } + } + } + + private void ensureReadable(final int length) { + if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { + Preconditions.checkArgument(length >= 0, "expecting non-negative length"); + if (length > readableBytes()) { + throw new IndexOutOfBoundsException( + String.format( + "readerIndex(%d) + length(%d) exceeds writerIndex(%d)", + readerIndex, length, writerIndex)); + } + } + } + + @Override + public byte readByte() { + ensureReadable(1); + final byte b = getByte(readerIndex); + ++readerIndex; + return b; + } + + @Override + public void readBytes(byte[] dst) { + Preconditions.checkArgument(dst != null, "expecting valid dst bytearray"); + ensureReadable(dst.length); + getBytes(readerIndex, dst, 0, dst.length); + readerIndex += dst.length; + } + + @Override + public void writeByte(byte value) { + ensureWritable(1); + byteBuffer.put(bufferIndex(writerIndex), value); + ++writerIndex; + } + + @Override + public void writeByte(int value) { + ensureWritable(1); + byteBuffer.put(bufferIndex(writerIndex), (byte) value); + ++writerIndex; + } + + @Override + public void writeBytes(byte[] src) { + Preconditions.checkArgument(src != null, "expecting valid src array"); + writeBytes(src, 0, src.length); + } + + @Override + public void writeBytes(byte[] src, int srcIndex, int length) { + ensureWritable(length); + setBytes(writerIndex, src, srcIndex, length); + writerIndex += length; + } + + @Override + public void writeShort(int value) { + ensureWritable(SHORT_SIZE); + byteBuffer.putShort(bufferIndex(writerIndex), (short) value); + writerIndex += SHORT_SIZE; + } + + @Override + public void writeInt(int value) { + ensureWritable(INT_SIZE); + byteBuffer.putInt(bufferIndex(writerIndex), value); + writerIndex += INT_SIZE; + } + + @Override + public void writeLong(long value) { + ensureWritable(LONG_SIZE); + byteBuffer.putLong(bufferIndex(writerIndex), value); + writerIndex += LONG_SIZE; + } + + @Override + public void writeFloat(float value) { + ensureWritable(FLOAT_SIZE); + byteBuffer.putFloat(bufferIndex(writerIndex), value); + writerIndex += FLOAT_SIZE; + } + + @Override + public void writeDouble(double value) { + ensureWritable(DOUBLE_SIZE); + byteBuffer.putDouble(bufferIndex(writerIndex), value); + writerIndex += DOUBLE_SIZE; + } + + // --- Bulk byte array operations --- + + private static boolean isOutOfBounds(long index, long length, long capacity) { + return (index | length | (index + length) | (capacity - (index + length))) < 0; + } + + private void checkIndex(long index, long fieldLength) { + if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { + if (isOutOfBounds(index, fieldLength, this.capacity())) { + throw new IndexOutOfBoundsException( + String.format( + "index: %d, length: %d (expected: range(0, %d))", + index, fieldLength, this.capacity())); + } + } + } + + @Override + public void getBytes(long index, byte[] dst) { + getBytes(index, dst, 0, dst.length); + } + + @Override + public void getBytes(long index, byte[] dst, int dstIndex, int length) { + checkIndex(index, length); + Preconditions.checkArgument(dst != null, "expecting a valid dst byte array"); + if (isOutOfBounds(dstIndex, length, dst.length)) { + throw new IndexOutOfBoundsException( + "Not enough space to copy data into destination" + dstIndex); + } + if (length != 0) { + // Use absolute positioning to avoid affecting buffer state + ByteBuffer duplicate = byteBuffer.duplicate(); + duplicate.position(bufferIndex(index)); + duplicate.get(dst, dstIndex, length); + } + } + + @Override + public void setBytes(long index, byte[] src) { + setBytes(index, src, 0, src.length); + } + + @Override + public void setBytes(long index, byte[] src, int srcIndex, long length) { + checkIndex(index, length); + Preconditions.checkArgument(src != null, "expecting a valid src byte array"); + if (isOutOfBounds(srcIndex, length, src.length)) { + throw new IndexOutOfBoundsException( + "Not enough space to copy data from byte array" + srcIndex); + } + if (length > 0) { + ByteBuffer duplicate = byteBuffer.duplicate(); + duplicate.position(bufferIndex(index)); + duplicate.put(src, srcIndex, (int) length); + } + } + + @Override + public void getBytes(long index, ByteBuffer dst) { + checkIndex(index, dst.remaining()); + if (dst.remaining() != 0) { + int length = dst.remaining(); + ByteBuffer duplicate = byteBuffer.duplicate(); + duplicate.position(bufferIndex(index)); + duplicate.limit(bufferIndex(index) + length); + dst.put(duplicate); + } + } + + @Override + public void setBytes(long index, ByteBuffer src) { + checkIndex(index, src.remaining()); + int length = src.remaining(); + if (length != 0) { + ByteBuffer duplicate = byteBuffer.duplicate(); + duplicate.position(bufferIndex(index)); + duplicate.put(src); + } + } + + @Override + public void setBytes(long index, ByteBuffer src, int srcIndex, int length) { + checkIndex(index, length); + if (length != 0) { + ByteBuffer srcDuplicate = src.duplicate(); + srcDuplicate.position(srcIndex); + srcDuplicate.limit(srcIndex + length); + + ByteBuffer duplicate = byteBuffer.duplicate(); + duplicate.position(bufferIndex(index)); + duplicate.put(srcDuplicate); + } + } + + @Override + public void getBytes(long index, ArrowBuf dst, long dstIndex, int length) { + checkIndex(index, length); + Preconditions.checkArgument(dst != null, "expecting a valid ArrowBuf"); + checkBufferType(dst); + if (isOutOfBounds(dstIndex, length, dst.capacity())) { + throw new IndexOutOfBoundsException( + String.format( + "index: %d, length: %d (expected: range(0, %d))", dstIndex, length, dst.capacity())); + } + if (length != 0) { + byte[] tmp = new byte[length]; + getBytes(index, tmp, 0, length); + dst.setBytes(dstIndex, tmp, 0, length); + } + } + + @Override + public void setBytes(long index, ArrowBuf src, long srcIndex, long length) { + checkIndex(index, length); + Preconditions.checkArgument(src != null, "expecting a valid ArrowBuf"); + checkBufferType(src); + if (isOutOfBounds(srcIndex, length, src.capacity())) { + throw new IndexOutOfBoundsException( + String.format( + "index: %d, length: %d (expected: range(0, %d))", srcIndex, length, src.capacity())); + } + if (length != 0) { + byte[] tmp = new byte[(int) length]; + src.getBytes(srcIndex, tmp, 0, (int) length); + setBytes(index, tmp, 0, length); + } + } + + @Override + public void setBytes(long index, ArrowBuf src) { + Preconditions.checkArgument(src != null, "expecting valid ArrowBuf"); + checkBufferType(src); + + final long length = src.readableBytes(); + checkIndex(index, length); + byte[] tmp = new byte[(int) length]; + src.getBytes(src.readerIndex(), tmp, 0, (int) length); + setBytes(index, tmp, 0, length); + src.readerIndex(src.readerIndex() + length); + } + + @Override + public int setBytes(long index, InputStream in, int length) throws IOException { + Preconditions.checkArgument(in != null, "expecting valid input stream"); + checkIndex(index, length); + int readBytes = 0; + if (length > 0) { + byte[] tmp = new byte[length]; + readBytes = in.read(tmp); + if (readBytes > 0) { + setBytes(index, tmp, 0, readBytes); + } + } + return readBytes; + } + + @Override + public void getBytes(long index, OutputStream out, int length) throws IOException { + Preconditions.checkArgument(out != null, "expecting valid output stream"); + checkIndex(index, length); + if (length > 0) { + byte[] tmp = new byte[length]; + getBytes(index, tmp, 0, length); + out.write(tmp); + } + } + + @Override + public void close() { + referenceManager.release(); + } + + @Override + public long getPossibleMemoryConsumed() { + return referenceManager.getSize(); + } + + @Override + public long getActualMemoryConsumed() { + return referenceManager.getAccountedSize(); + } + + @Override + public String toHexString(final long start, final int length) { + final long roundedStart = (start / LOG_BYTES_PER_ROW) * LOG_BYTES_PER_ROW; + + final StringBuilder sb = new StringBuilder("buffer byte dump\n"); + long index = roundedStart; + for (long nLogged = 0; nLogged < length; nLogged += LOG_BYTES_PER_ROW) { + sb.append(String.format(" [%05d-%05d]", index, index + LOG_BYTES_PER_ROW - 1)); + for (int i = 0; i < LOG_BYTES_PER_ROW; ++i) { + try { + final byte b = getByte(index++); + sb.append(String.format(" 0x%02x", b)); + } catch (IndexOutOfBoundsException ioob) { + sb.append(" "); + } + } + sb.append('\n'); + } + return sb.toString(); + } + + @Override + @VisibleForTesting + public void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) { + CommonUtil.indent(sb, indent).append(toString()); + } + + @Override + public void print(StringBuilder sb, int indent) { + print(sb, indent, BaseAllocator.Verbosity.LOG_WITH_STACKTRACE); + } + + @Override + public long readerIndex() { + return readerIndex; + } + + @Override + public long writerIndex() { + return writerIndex; + } + + @Override + public ArrowBuf readerIndex(long readerIndex) { + this.readerIndex = readerIndex; + return this; + } + + @Override + public ArrowBuf writerIndex(long writerIndex) { + this.writerIndex = writerIndex; + return this; + } + + @Override + public ArrowBuf setZero(long index, long length) { + if (length != 0) { + this.checkIndex(index, length); + // Fill with zeros using Arrays.fill on the backing array + int startIdx = bufferIndex(index); + int endIdx = startIdx + (int) length; + Arrays.fill(byteBuffer.array(), startIdx, endIdx, (byte) 0); + } + return this; + } + + @Override + @Deprecated + public ArrowBuf setOne(int index, int length) { + return setOne((long) index, (long) length); + } + + @Override + public ArrowBuf setOne(long index, long length) { + if (length != 0) { + this.checkIndex(index, length); + int startIdx = bufferIndex(index); + int endIdx = startIdx + (int) length; + Arrays.fill(byteBuffer.array(), startIdx, endIdx, (byte) 0xff); + } + return this; + } + + @Override + public ArrowBuf reallocIfNeeded(final long size) { + Preconditions.checkArgument(size >= 0, "reallocation size must be non-negative"); + if (this.capacity() >= size) { + return this; + } + if (bufferManager != null) { + return bufferManager.replace(this, size); + } else { + throw new UnsupportedOperationException( + "Realloc is only available in the context of operator's UDFs"); + } + } + + @Override + public ArrowBuf clear() { + this.readerIndex = this.writerIndex = 0; + return this; + } + + /** + * Returns the offset within the underlying ByteBuffer where this buffer's data starts. This is + * used for sliced buffers that share the same underlying ByteBuffer. + * + * @return the offset in bytes + */ + public int getOffset() { + return offset; + } + + /** + * @return the underlying ByteBuffer. + */ + ByteBuffer getByteBuffer() { + return byteBuffer; + } + + private void checkBufferType(ArrowBuf buffer) { + if (!(buffer instanceof DatabricksArrowBuf)) { + throw new IllegalArgumentException("Buffer should be an instance of DatabricksArrowBuf"); + } + } +} diff --git a/src/main/java/org/apache/arrow/memory/DatabricksBufferAllocator.java b/src/main/java/org/apache/arrow/memory/DatabricksBufferAllocator.java new file mode 100644 index 000000000..bf5e33840 --- /dev/null +++ b/src/main/java/org/apache/arrow/memory/DatabricksBufferAllocator.java @@ -0,0 +1,230 @@ +package org.apache.arrow.memory; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.arrow.memory.rounding.DefaultRoundingPolicy; +import org.apache.arrow.memory.rounding.RoundingPolicy; +import org.apache.arrow.util.Preconditions; + +/** + * A BufferAllocator implementation that uses DatabricksArrowBuf for memory allocation. This + * allocator uses heap-based ByteBuffer storage instead of direct/off-heap memory, avoiding the need + * for sun.misc.Unsafe operations. + * + *

This implementation is suitable for environments where direct memory access is restricted or + * where heap-based memory management is preferred. + */ +public class DatabricksBufferAllocator implements BufferAllocator { + private final String name; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final DatabricksBufferAllocator parent; + private final Set children = ConcurrentHashMap.newKeySet(); + + // Empty buffer singleton + private final ArrowBuf emptyBuffer; + + /** Creates a root allocator with default settings. */ + public DatabricksBufferAllocator() { + this("DatabricksBufferAllocator"); + } + + /** + * Creates a root allocator with specified limit. + * + * @param name the allocator name + */ + public DatabricksBufferAllocator(String name) { + this(name, null); + } + + /** + * Creates an allocator with full configuration. + * + * @param name the allocator name + * @param parent the parent allocator (null for root) + */ + public DatabricksBufferAllocator(String name, DatabricksBufferAllocator parent) { + this.name = name; + this.parent = parent; + + // Create an empty buffer with a no-op reference manager + this.emptyBuffer = new DatabricksArrowBuf(ReferenceManager.NO_OP, null, 0); + } + + @Override + public ArrowBuf buffer(long size) { + return buffer(size, null); + } + + @Override + public ArrowBuf buffer(long size, BufferManager manager) { + assertOpen(); + Preconditions.checkArgument(size >= 0, "Buffer size must be non-negative"); + + if (size == 0) { + return getEmpty(); + } + + // Create the reference manager and buffer + DatabricksReferenceManager refManager = new DatabricksReferenceManager(this, size); + return new DatabricksArrowBuf(refManager, manager, size); + } + + @Override + public BufferAllocator getRoot() { + if (parent == null) { + return this; + } + return parent.getRoot(); + } + + @Override + public BufferAllocator newChildAllocator(String name, long initReservation, long maxAllocation) { + return newChildAllocator(name, AllocationListener.NOOP, initReservation, maxAllocation); + } + + @Override + public BufferAllocator newChildAllocator( + String name, AllocationListener listener, long initReservation, long maxAllocation) { + assertOpen(); + + DatabricksBufferAllocator child = new DatabricksBufferAllocator(name, this); + + children.add(child); + + return child; + } + + @Override + public void close() { + if (!closed.compareAndSet(false, true)) { + return; + } + + // Close all children first + for (DatabricksBufferAllocator child : children) { + child.close(); + } + children.clear(); + + // Remove from parent's children list + if (parent != null) { + parent.children.remove(this); + } + } + + @Override + public long getAllocatedMemory() { + return 0; + } + + @Override + public long getLimit() { + return Integer.MAX_VALUE; + } + + @Override + public long getInitReservation() { + return 0; + } + + @Override + public void setLimit(long newLimit) { + // Do nothing. + } + + @Override + public long getPeakMemoryAllocation() { + // Do nothing. + return 0; + } + + @Override + public long getHeadroom() { + return Integer.MAX_VALUE; + } + + @Override + public boolean forceAllocate(long size) { + if (parent != null) { + parent.forceAllocate(size); + } + return true; + } + + @Override + public void releaseBytes(long size) { + // Do nothing. + } + + @Override + public AllocationListener getListener() { + return AllocationListener.NOOP; + } + + @Override + public BufferAllocator getParentAllocator() { + return parent; + } + + @Override + public Collection getChildAllocators() { + return Collections.unmodifiableSet(children); + } + + @Override + public AllocationReservation newReservation() { + assertOpen(); + return new DatabricksAllocationReservation(this); + } + + @Override + public ArrowBuf getEmpty() { + return emptyBuffer; + } + + @Override + public String getName() { + return name; + } + + @Override + public boolean isOverLimit() { + // Never over limit. + return false; + } + + @Override + public String toVerboseString() { + StringBuilder sb = new StringBuilder(); + sb.append("Allocator(").append(name).append(") "); + if (!children.isEmpty()) { + sb.append("\n Children:\n"); + for (DatabricksBufferAllocator child : children) { + sb.append(" ").append(child.toVerboseString().replace("\n", "\n ")).append("\n"); + } + } + return sb.toString(); + } + + @Override + public void assertOpen() { + if (closed.get()) { + throw new IllegalStateException("Allocator " + name + " is closed"); + } + } + + @Override + public RoundingPolicy getRoundingPolicy() { + return DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY; + } + + @Override + public ArrowBuf wrapForeignAllocation(ForeignAllocation allocation) { + throw new UnsupportedOperationException( + "DatabricksBufferAllocator does not support foreign allocations"); + } +} diff --git a/src/main/java/org/apache/arrow/memory/DatabricksReferenceManager.java b/src/main/java/org/apache/arrow/memory/DatabricksReferenceManager.java new file mode 100644 index 000000000..0626908bb --- /dev/null +++ b/src/main/java/org/apache/arrow/memory/DatabricksReferenceManager.java @@ -0,0 +1,131 @@ +package org.apache.arrow.memory; + +import org.apache.arrow.util.Preconditions; + +/** + * A Databricks reference manager which acts as a no-op and does not reference count. All data is + * allocated on the heap and taken care of by the JVM garbage collector. + */ +class DatabricksReferenceManager implements ReferenceManager { + /** Allocator of this reference manager. */ + private final DatabricksBufferAllocator allocator; + + /** Size of this reference. */ + private final long size; + + /** The memory is heap allocated and taken care of by the JVM. Assuming value of one is safe. */ + private static final int REF_COUNT = 1; + + public DatabricksReferenceManager(DatabricksBufferAllocator allocator, long size) { + this.allocator = allocator; + this.size = size; + } + + @Override + public int getRefCount() { + return REF_COUNT; + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + return getRefCount() == 0; + } + + @Override + public void retain() { + retain(1); + } + + @Override + public void retain(int increment) { + // Do nothing. + } + + @Override + public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) { + DatabricksArrowBuf buf = checkBufferType(srcBuffer); + return deriveBuffer(buf); + } + + private ArrowBuf deriveBuffer(DatabricksArrowBuf srcBuffer) { + return deriveBuffer(srcBuffer, 0, srcBuffer.capacity()); + } + + @Override + public ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, long index, long length) { + Preconditions.checkArgument( + length <= Integer.MAX_VALUE, + "Length %s should be less than or equal to %s", + length, + Integer.MAX_VALUE); + + Preconditions.checkArgument( + index + length <= sourceBuffer.capacity(), + "Index=" + + index + + " and length=" + + length + + " exceeds source buffer capacity=" + + sourceBuffer.capacity()); + + // Create a new DatabricksArrowBuf sharing the same byte buffer. + DatabricksArrowBuf buf = checkBufferType(sourceBuffer); + return new DatabricksArrowBuf( + this, null, buf.getByteBuffer(), buf.getOffset() + (int) index, length); + } + + @Override + public OwnershipTransferResult transferOwnership( + ArrowBuf sourceBuffer, BufferAllocator targetAllocator) { + DatabricksArrowBuf buf = checkBufferType(sourceBuffer); + checkAllocatorType(targetAllocator); + + final ArrowBuf newBuf = deriveBuffer(buf); + return new OwnershipTransferResult() { + @Override + public boolean getAllocationFit() { + return true; + } + + @Override + public ArrowBuf getTransferredBuffer() { + return newBuf; + } + }; + } + + @Override + public BufferAllocator getAllocator() { + return allocator; + } + + @Override + public long getSize() { + return size; + } + + @Override + public long getAccountedSize() { + return size; + } + + private DatabricksArrowBuf checkBufferType(ArrowBuf buffer) { + if (!(buffer instanceof DatabricksArrowBuf)) { + throw new IllegalArgumentException("Buffer should be an instance of DatabricksArrowBuf"); + } + return (DatabricksArrowBuf) buffer; + } + + private DatabricksBufferAllocator checkAllocatorType(BufferAllocator bufferAllocator) { + if (!(bufferAllocator instanceof DatabricksBufferAllocator)) { + throw new IllegalArgumentException( + "Allocator should be an instance of DatabricksBufferAllocator"); + } + return (DatabricksBufferAllocator) bufferAllocator; + } +} diff --git a/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java b/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java new file mode 100644 index 000000000..3855bc62f --- /dev/null +++ b/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// ------------------------------------------------------------------------- +// MODIFICATION NOTICE: +// This file was modified by Databricks, Inc on 16-December-2025. +// Description of changes: Patched static initializer to not printStackTrace on failure. +// ------------------------------------------------------------------------- + +package org.apache.arrow.memory.util; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.security.AccessController; +import java.security.PrivilegedAction; +import org.checkerframework.checker.nullness.qual.Nullable; +import sun.misc.Unsafe; + +/** Utilities for memory related operations. */ +public class MemoryUtil { + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(MemoryUtil.class); + + private static final @Nullable Constructor DIRECT_BUFFER_CONSTRUCTOR; + + /** The unsafe object from which to access the off-heap memory. */ + private static final Unsafe UNSAFE; + + /** The start offset of array data relative to the start address of the array object. */ + private static final long BYTE_ARRAY_BASE_OFFSET; + + /** The offset of the address field with the {@link ByteBuffer} object. */ + private static final long BYTE_BUFFER_ADDRESS_OFFSET; + + /** If the native byte order is little-endian. */ + public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + + // Java 1.8, 9, 11, 17, 21 becomes 1, 9, 11, 17, and 21. + @SuppressWarnings("StringSplitter") + private static final int majorVersion = + Integer.parseInt(System.getProperty("java.specification.version").split("\\D+")[0]); + + static { + try { + // try to get the unsafe object + final Object maybeUnsafe = + AccessController.doPrivileged( + new PrivilegedAction() { + @Override + @SuppressWarnings({"nullness:argument", "nullness:return"}) + // incompatible argument for parameter obj of Field.get + // incompatible types in return + public Object run() { + try { + final Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + return unsafeField.get(null); + } catch (Throwable e) { + return e; + } + } + }); + + if (maybeUnsafe instanceof Throwable) { + throw (Throwable) maybeUnsafe; + } + + UNSAFE = (Unsafe) maybeUnsafe; + + // get the offset of the data inside a byte array object + BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + // get the offset of the address field in a java.nio.Buffer object + Field addressField = java.nio.Buffer.class.getDeclaredField("address"); + addressField.setAccessible(true); + BYTE_BUFFER_ADDRESS_OFFSET = UNSAFE.objectFieldOffset(addressField); + + Constructor directBufferConstructor; + long address = -1; + final ByteBuffer direct = ByteBuffer.allocateDirect(1); + try { + + final Object maybeDirectBufferConstructor = + AccessController.doPrivileged( + new PrivilegedAction() { + @Override + public Object run() { + try { + final Constructor constructor = + (majorVersion >= 21) + ? direct.getClass().getDeclaredConstructor(long.class, long.class) + : direct.getClass().getDeclaredConstructor(long.class, int.class); + constructor.setAccessible(true); + logger.debug("Constructor for direct buffer found and made accessible"); + return constructor; + } catch (NoSuchMethodException e) { + logger.debug("Cannot get constructor for direct buffer allocation", e); + return e; + } catch (SecurityException e) { + logger.debug("Cannot get constructor for direct buffer allocation", e); + return e; + } + } + }); + + if (maybeDirectBufferConstructor instanceof Constructor) { + address = UNSAFE.allocateMemory(1); + // try to use the constructor now + try { + ((Constructor) maybeDirectBufferConstructor).newInstance(address, 1); + directBufferConstructor = (Constructor) maybeDirectBufferConstructor; + logger.debug("direct buffer constructor: available"); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + logger.warn("unable to instantiate a direct buffer via constructor", e); + directBufferConstructor = null; + } + } else { + logger.debug( + "direct buffer constructor: unavailable", (Throwable) maybeDirectBufferConstructor); + directBufferConstructor = null; + } + } finally { + if (address != -1) { + UNSAFE.freeMemory(address); + } + } + DIRECT_BUFFER_CONSTRUCTOR = directBufferConstructor; + } catch (Throwable e) { + // This exception will get swallowed, but it's necessary for the static analysis that ensures + // the static fields above get initialized + final RuntimeException failure = + new RuntimeException( + "Failed to initialize MemoryUtil. You must start Java with " + + "`--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED` " + + "(See https://arrow.apache.org/docs/java/install.html)", + e); + // ---- Databricks patch start ---- + // ---- Remove 'failure.printStackTrace();' + // ---- Databricks patch end ---- + throw failure; + } + } + + /** + * Given a {@link ByteBuffer}, gets the address the underlying memory space. + * + * @param buf the byte buffer. + * @return address of the underlying memory. + */ + public static long getByteBufferAddress(ByteBuffer buf) { + return UNSAFE.getLong(buf, BYTE_BUFFER_ADDRESS_OFFSET); + } + + private MemoryUtil() {} + + /** Create nio byte buffer. */ + public static ByteBuffer directBuffer(long address, int capacity) { + if (DIRECT_BUFFER_CONSTRUCTOR != null) { + if (capacity < 0) { + throw new IllegalArgumentException("Capacity is negative, has to be positive or 0"); + } + try { + return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity); + } catch (Throwable cause) { + throw new Error(cause); + } + } + throw new UnsupportedOperationException( + "sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available"); + } + + @SuppressWarnings( + "nullness:argument") // to handle null assignment on third party dependency: Unsafe + private static void copyMemory( + @Nullable Object srcBase, + long srcOffset, + @Nullable Object destBase, + long destOffset, + long bytes) { + UNSAFE.copyMemory(srcBase, srcOffset, destBase, destOffset, bytes); + } + + public static void copyMemory(long srcAddress, long destAddress, long bytes) { + UNSAFE.copyMemory(srcAddress, destAddress, bytes); + } + + public static void copyToMemory(byte[] src, long srcIndex, long destAddress, long bytes) { + copyMemory(src, BYTE_ARRAY_BASE_OFFSET + srcIndex, null, destAddress, bytes); + } + + public static void copyFromMemory(long srcAddress, byte[] dest, long destIndex, long bytes) { + copyMemory(null, srcAddress, dest, BYTE_ARRAY_BASE_OFFSET + destIndex, bytes); + } + + public static byte getByte(long address) { + return UNSAFE.getByte(address); + } + + public static void putByte(long address, byte value) { + UNSAFE.putByte(address, value); + } + + public static short getShort(long address) { + return UNSAFE.getShort(address); + } + + public static void putShort(long address, short value) { + UNSAFE.putShort(address, value); + } + + public static int getInt(long address) { + return UNSAFE.getInt(address); + } + + public static void putInt(long address, int value) { + UNSAFE.putInt(address, value); + } + + public static long getLong(long address) { + return UNSAFE.getLong(address); + } + + public static void putLong(long address, long value) { + UNSAFE.putLong(address, value); + } + + public static void setMemory(long address, long bytes, byte value) { + UNSAFE.setMemory(address, bytes, value); + } + + public static int getInt(byte[] bytes, int index) { + return UNSAFE.getInt(bytes, BYTE_ARRAY_BASE_OFFSET + index); + } + + public static long getLong(byte[] bytes, int index) { + return UNSAFE.getLong(bytes, BYTE_ARRAY_BASE_OFFSET + index); + } + + public static long allocateMemory(long bytes) { + return UNSAFE.allocateMemory(bytes); + } + + public static void freeMemory(long address) { + UNSAFE.freeMemory(address); + } +} diff --git a/src/main/java/org/apache/arrow/vector/util/DecimalUtility.java b/src/main/java/org/apache/arrow/vector/util/DecimalUtility.java new file mode 100644 index 000000000..ae76865c6 --- /dev/null +++ b/src/main/java/org/apache/arrow/vector/util/DecimalUtility.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// ------------------------------------------------------------------------- +// MODIFICATION NOTICE: +// This file was modified by Databricks, Inc on 16-December-2025. +// Description of changes: Patched method writeLongToArrowBuf to handle DatabricksArrowBuf. +// ------------------------------------------------------------------------- + +package org.apache.arrow.vector.util; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.DatabricksArrowBuf; +import org.apache.arrow.memory.util.MemoryUtil; + +/** Utility methods for configurable precision Decimal values (e.g. {@link BigDecimal}). */ +public class DecimalUtility { + private DecimalUtility() {} + + public static final byte[] zeroes = + new byte[] { + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + }; + public static final byte[] minus_one = + new byte[] { + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 + }; + private static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + + /** + * Read an ArrowType.Decimal at the given value index in the ArrowBuf and convert to a BigDecimal + * with the given scale. + */ + public static BigDecimal getBigDecimalFromArrowBuf( + ArrowBuf bytebuf, int index, int scale, int byteWidth) { + byte[] value = new byte[byteWidth]; + byte temp; + final long startIndex = (long) index * byteWidth; + + bytebuf.getBytes(startIndex, value, 0, byteWidth); + if (LITTLE_ENDIAN) { + // Decimal stored as native endian, need to swap bytes to make BigDecimal if native endian is + // LE + int stop = byteWidth / 2; + for (int i = 0, j; i < stop; i++) { + temp = value[i]; + j = (byteWidth - 1) - i; + value[i] = value[j]; + value[j] = temp; + } + } + BigInteger unscaledValue = new BigInteger(value); + return new BigDecimal(unscaledValue, scale); + } + + /** + * Read an ArrowType.Decimal from the ByteBuffer and convert to a BigDecimal with the given scale. + */ + public static BigDecimal getBigDecimalFromByteBuffer( + ByteBuffer bytebuf, int scale, int byteWidth) { + byte[] value = new byte[byteWidth]; + bytebuf.get(value); + BigInteger unscaledValue = new BigInteger(value); + return new BigDecimal(unscaledValue, scale); + } + + /** + * Read an ArrowType.Decimal from the ArrowBuf at the given value index and return it as a byte + * array. + */ + public static byte[] getByteArrayFromArrowBuf(ArrowBuf bytebuf, int index, int byteWidth) { + final byte[] value = new byte[byteWidth]; + final long startIndex = (long) index * byteWidth; + bytebuf.getBytes(startIndex, value, 0, byteWidth); + return value; + } + + /** + * Check that the BigDecimal scale equals the vectorScale and that the BigDecimal precision is + * less than or equal to the vectorPrecision. If not, then an UnsupportedOperationException is + * thrown, otherwise returns true. + */ + public static boolean checkPrecisionAndScale( + BigDecimal value, int vectorPrecision, int vectorScale) { + if (value.scale() != vectorScale) { + throw new UnsupportedOperationException( + "BigDecimal scale must equal that in the Arrow vector: " + + value.scale() + + " != " + + vectorScale); + } + if (value.precision() > vectorPrecision) { + throw new UnsupportedOperationException( + "BigDecimal precision cannot be greater than that in the Arrow " + + "vector: " + + value.precision() + + " > " + + vectorPrecision); + } + return true; + } + + /** + * Check that the BigDecimal scale equals the vectorScale and that the BigDecimal precision is + * less than or equal to the vectorPrecision. Return true if so, otherwise return false. + */ + public static boolean checkPrecisionAndScaleNoThrow( + BigDecimal value, int vectorPrecision, int vectorScale) { + return value.scale() == vectorScale && value.precision() < vectorPrecision; + } + + /** + * Check that the decimal scale equals the vectorScale and that the decimal precision is less than + * or equal to the vectorPrecision. If not, then an UnsupportedOperationException is thrown, + * otherwise returns true. + */ + public static boolean checkPrecisionAndScale( + int decimalPrecision, int decimalScale, int vectorPrecision, int vectorScale) { + if (decimalScale != vectorScale) { + throw new UnsupportedOperationException( + "BigDecimal scale must equal that in the Arrow vector: " + + decimalScale + + " != " + + vectorScale); + } + if (decimalPrecision > vectorPrecision) { + throw new UnsupportedOperationException( + "BigDecimal precision cannot be greater than that in the Arrow " + + "vector: " + + decimalPrecision + + " > " + + vectorPrecision); + } + return true; + } + + /** + * Write the given BigDecimal to the ArrowBuf at the given value index. Will throw an + * UnsupportedOperationException if the decimal size is greater than the Decimal vector byte + * width. + */ + public static void writeBigDecimalToArrowBuf( + BigDecimal value, ArrowBuf bytebuf, int index, int byteWidth) { + final byte[] bytes = value.unscaledValue().toByteArray(); + writeByteArrayToArrowBufHelper(bytes, bytebuf, index, byteWidth); + } + + /** + * Write the given long to the ArrowBuf at the given value index. This routine extends the + * original sign bit to a new upper area in 128-bit or 256-bit. + */ + public static void writeLongToArrowBuf(long value, ArrowBuf bytebuf, int index, int byteWidth) { + if (byteWidth != 16 && byteWidth != 32) { + throw new UnsupportedOperationException( + "DecimalUtility.writeLongToArrowBuf() currently supports " + + "128-bit or 256-bit width data"); + } + final long padValue = Long.signum(value) == -1 ? -1L : 0L; + + // ---- Databricks patch start ---- + if (bytebuf instanceof DatabricksArrowBuf) { + DatabricksArrowBuf buf = (DatabricksArrowBuf) bytebuf; + final int startIdx = index * byteWidth; + if (LITTLE_ENDIAN) { + buf.setLong(startIdx, value); + for (int i = 1; i <= (byteWidth - 8) / 8; i++) { + buf.setLong(startIdx + Long.BYTES * i, padValue); + } + } else { + for (int i = 0; i < (byteWidth - 8) / 8; i++) { + MemoryUtil.putLong(startIdx + Long.BYTES * i, padValue); + } + buf.setLong(startIdx + Long.BYTES * (byteWidth - 8) / 8, value); + } + } else { + final long addressOfValue = bytebuf.memoryAddress() + (long) index * byteWidth; + if (LITTLE_ENDIAN) { + MemoryUtil.putLong(addressOfValue, value); + for (int i = 1; i <= (byteWidth - 8) / 8; i++) { + MemoryUtil.putLong(addressOfValue + Long.BYTES * i, padValue); + } + } else { + for (int i = 0; i < (byteWidth - 8) / 8; i++) { + MemoryUtil.putLong(addressOfValue + Long.BYTES * i, padValue); + } + MemoryUtil.putLong(addressOfValue + Long.BYTES * (byteWidth - 8) / 8, value); + } + } + // ---- Databricks patch end ---- + } + + /** + * Write the given byte array to the ArrowBuf at the given value index. Will throw an + * UnsupportedOperationException if the decimal size is greater than the Decimal vector byte + * width. + */ + public static void writeByteArrayToArrowBuf( + byte[] bytes, ArrowBuf bytebuf, int index, int byteWidth) { + writeByteArrayToArrowBufHelper(bytes, bytebuf, index, byteWidth); + } + + private static void writeByteArrayToArrowBufHelper( + byte[] bytes, ArrowBuf bytebuf, int index, int byteWidth) { + final long startIndex = (long) index * byteWidth; + if (bytes.length > byteWidth) { + throw new UnsupportedOperationException( + "Decimal size greater than " + byteWidth + " bytes: " + bytes.length); + } + + byte[] padBytes = bytes[0] < 0 ? minus_one : zeroes; + if (LITTLE_ENDIAN) { + // Decimal stored as native-endian, need to swap data bytes before writing to ArrowBuf if LE + byte[] bytesLE = new byte[bytes.length]; + for (int i = 0; i < bytes.length; i++) { + bytesLE[i] = bytes[bytes.length - 1 - i]; + } + + // Write LE data + bytebuf.setBytes(startIndex, bytesLE, 0, bytes.length); + bytebuf.setBytes(startIndex + bytes.length, padBytes, 0, byteWidth - bytes.length); + } else { + // Write BE data + bytebuf.setBytes(startIndex + byteWidth - bytes.length, bytes, 0, bytes.length); + bytebuf.setBytes(startIndex, padBytes, 0, byteWidth - bytes.length); + } + } +} diff --git a/src/test/java/com/databricks/jdbc/api/impl/arrow/ArrowBufferAllocatorTest.java b/src/test/java/com/databricks/jdbc/api/impl/arrow/ArrowBufferAllocatorTest.java new file mode 100644 index 000000000..7e0a3f8e6 --- /dev/null +++ b/src/test/java/com/databricks/jdbc/api/impl/arrow/ArrowBufferAllocatorTest.java @@ -0,0 +1,97 @@ +package com.databricks.jdbc.api.impl.arrow; + +import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.DatabricksBufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledOnJre; +import org.junit.jupiter.api.condition.JRE; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Test the functionality of {@link ArrowBufferAllocator}. */ +public class ArrowBufferAllocatorTest { + /** Logger instance. */ + private static final Logger logger = LoggerFactory.getLogger(ArrowBufferAllocatorTest.class); + + /** Test that root allocator can be created. */ + @Test + public void testCreateRootAllocator() throws IOException { + try (BufferAllocator allocator = ArrowBufferAllocator.getBufferAllocator()) { + assertInstanceOf(RootAllocator.class, allocator, "Should create RootAllocator"); + readAndWriteArrowData(allocator); + } + } + + /** + * Test that the fallback {@code DatabricksBufferAllocator} is used when the creation of {@code + * RootAllocator} is not possible in the current JVM. + */ + @Test + @Tag("Jvm17PlusAndArrowToNioReflectionDisabled") + @EnabledOnJre({JRE.JAVA_17, JRE.JAVA_21}) + public void testCreateDatabricksBufferAllocator() throws IOException { + try (BufferAllocator allocator = ArrowBufferAllocator.getBufferAllocator()) { + assertInstanceOf( + DatabricksBufferAllocator.class, allocator, "Should create DatabricksBufferAllocator"); + readAndWriteArrowData(allocator); + } + } + + /** Write and read a sample arrow data to validate that the BufferAllocator works. */ + private void readAndWriteArrowData(BufferAllocator allocator) throws IOException { + // 1. Write sample data. + Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null); + Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null); + Schema schemaPerson = new Schema(asList(name, age)); + try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schemaPerson, allocator)) { + VarCharVector nameVector = (VarCharVector) vectorSchemaRoot.getVector("name"); + nameVector.allocateNew(3); + nameVector.set(0, "David".getBytes()); + nameVector.set(1, "Gladis".getBytes()); + nameVector.set(2, "Juan".getBytes()); + IntVector ageVector = (IntVector) vectorSchemaRoot.getVector("age"); + ageVector.allocateNew(3); + ageVector.set(0, 10); + ageVector.set(1, 20); + ageVector.set(2, 30); + vectorSchemaRoot.setRowCount(3); + ByteArrayOutputStream arrowData = new ByteArrayOutputStream(); + try (ArrowStreamWriter writer = + new ArrowStreamWriter(vectorSchemaRoot, null, Channels.newChannel(arrowData))) { + writer.start(); + writer.writeBatch(); + logger.info("Number of rows written: " + vectorSchemaRoot.getRowCount()); + } + + // 2. Read the sample data. + int totalRecords = 0; + try (ArrowStreamReader reader = + new ArrowStreamReader(new ByteArrayInputStream(arrowData.toByteArray()), allocator)) { + while (reader.loadNextBatch()) { + totalRecords += reader.getVectorSchemaRoot().getRowCount(); + } + } + + assertEquals(3, totalRecords, "Read 3 records"); + } + } +}