From 48ed9e2e20f8d7729985413091f41579daf8ee09 Mon Sep 17 00:00:00 2001 From: wanghaoyang22 Date: Thu, 8 Jan 2026 12:44:06 +0800 Subject: [PATCH] Fixed the possibility of batch writing where the data written in each batch exceeds the threshold --- .../client/AsyncBufferedMutatorImpl.java | 41 +++++++++++++++---- .../hbase/client/TestAsyncBufferMutator.java | 2 +- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java index 452318c1ef5a..b198aee10b08 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java @@ -117,7 +117,7 @@ public Configuration getConfiguration() { * Protected for being overridden in tests. * @return a {@link Batch} containing drained mutations and futures, or {@code null} if empty */ - protected Batch drainBatch() { + protected List drainBatch() { ArrayList toSend; ArrayList> toComplete; // Cancel the flush task if it is pending. @@ -131,10 +131,35 @@ protected Batch drainBatch() { } toComplete = this.futures; assert toSend.size() == toComplete.size(); - this.mutations = new ArrayList<>(INITIAL_CAPACITY); - this.futures = new ArrayList<>(INITIAL_CAPACITY); - bufferedSize = 0L; - return new Batch(toSend, toComplete); + List batches = new ArrayList<>(); + long thisHeapSize = 0; + int thisSendNums = 0; + ArrayList thisSend = new ArrayList<>(); + ArrayList> thisComplete = new ArrayList<>(); + for (int i = 0; i < toSend.size(); i++) { + thisSend.add(toSend.get(i)); + thisComplete.add(toComplete.get(i)); + thisHeapSize += toSend.get(i).heapSize(); + thisSendNums++; + if ((maxMutations > 0 && thisSendNums >= maxMutations) || thisHeapSize >= writeBufferSize) { + batches.add(new Batch(thisSend, thisComplete)); + thisHeapSize = 0; + thisSendNums = 0; + } + } + this.mutations = thisSendNums == 0 ? new ArrayList<>(INITIAL_CAPACITY) : new ArrayList<>(toSend.subList(toSend.size() - thisSendNums, toSend.size())); + this.futures = thisSendNums == 0 ? new ArrayList<>(INITIAL_CAPACITY) : new ArrayList<>(toComplete.subList(toComplete.size() - thisSendNums, toSend.size())); + bufferedSize = thisSendNums == 0 ? 0 : toSend.subList(toSend.size() - thisSendNums, toSend.size()).stream().mapToLong(Mutation::heapSize).sum(); + return batches; + } + + private void sendBatch(List batchs) { + if (batchs == null || batchs.isEmpty()) { + return; + } + for (Batch batch : batchs) { + sendBatch(batch); + } } /** @@ -179,12 +204,12 @@ public List> mutate(List mutations) validatePut((Put) mutation, maxKeyValueSize); } } - Batch batch = null; + List batch = null; lock.lock(); try { if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) { periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> { - Batch flushBatch = null; + List flushBatch = null; lock.lock(); try { // confirm that we are still valid, if there is already a drainBatch call before us, @@ -227,7 +252,7 @@ public List> mutate(List mutations) // The only difference bewteen flush and close is that, we will set closed to true before sending // out the batch to prevent further flush or close private void flushOrClose(boolean close) { - Batch batch = null; + List batch = null; if (!closed) { lock.lock(); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java index a6a6333cd08c..69f481e82563 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java @@ -267,7 +267,7 @@ private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutato } @Override - protected Batch drainBatch() { + protected List drainBatch() { drainCount++; return super.drainBatch(); }