Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,29 @@ public int getTimeout() {
* @throws IllegalStateException if this service's state isn't FAILED.
*/
Throwable failureCause();

// WAL entries are buffered in ContinuousBackupReplicationEndpoint before flushing to WAL backup
// file. So we return config value CONF_BACKUP_MAX_WAL_SIZE for
// ContinuousBackupReplicationEndpoint
// and -1 for other ReplicationEndpoint since they don't buffer.
// For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication
// offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment references 'shouldFlushStagedWal()' but the actual method name in ReplicationSourceShipper is 'shouldPersistLogPosition()'. This inconsistency will confuse developers trying to understand the interaction between these components.

Copilot uses AI. Check for mistakes.
default long getMaxBufferSize() {
return -1;
}

// WAL entries are buffered in ContinuousBackupReplicationEndpoint before flushing to WAL backup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ContinuousBackupReplicationEndpoint is part of #7591 and it's yet committing to master, should we mention early in this change?

// file. So we return config value CONF_STAGED_WAL_FLUSH_INTERVAL for
// ContinuousBackupReplicationEndpoint
// and Long.MAX_VALUE for other ReplicationEndpoint since they don't buffer.
// For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication
// offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment references 'shouldFlushStagedWal()' but the actual method name in ReplicationSourceShipper is 'shouldPersistLogPosition()'. This inconsistency will confuse developers trying to understand the interaction between these components.

Copilot uses AI. Check for mistakes.
default long maxFlushInterval() {
return Long.MAX_VALUE;
}

// Used in ContinuousBackupReplicationEndpoint to flush/close WAL backup files
default void beforePersistingReplicationOffset() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -74,6 +75,10 @@ public enum WorkerState {
private final int DEFAULT_TIMEOUT = 20000;
private final int getEntriesTimeout;
private final int shipEditsTimeout;
private long stagedWalSize = 0L;
private long lastStagedFlushTs = EnvironmentEdgeManager.currentTime();
private WALEntryBatch lastShippedBatch;
private final List<Entry> entriesForCleanUpHFileRefs = new ArrayList<>();

public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source,
ReplicationSourceWALReader walReader) {
Expand All @@ -98,6 +103,10 @@ public final void run() {
LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
// Loop until we close down
while (isActive()) {
// check if flush needed for WAL backup, this is need for timeout based flush
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment has a typo: 'need' should be 'needed'. Should read "this is needed for timeout based flush".

Suggested change
// check if flush needed for WAL backup, this is need for timeout based flush
// check if flush needed for WAL backup, this is needed for timeout based flush

Copilot uses AI. Check for mistakes.
if (shouldPersistLogPosition()) {
persistLogPosition();
}
// Sleep until replication is enabled again
if (!source.isPeerEnabled()) {
// The peer enabled check is in memory, not expensive, so do not need to increase the
Expand Down Expand Up @@ -155,7 +164,8 @@ private void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
updateLogPosition(entryBatch);
lastShippedBatch = entryBatch;
persistLogPosition();
return;
}
int currentSize = (int) entryBatch.getHeapSize();
Expand Down Expand Up @@ -190,13 +200,13 @@ private void shipEdits(WALEntryBatch entryBatch) {
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);

stagedWalSize += currentSize;
entriesForCleanUpHFileRefs.addAll(entries);
lastShippedBatch = entryBatch;
if (shouldPersistLogPosition()) {
persistLogPosition();
}
// Log and clean up WAL logs
updateLogPosition(entryBatch);

// offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
// this sizeExcludeBulkLoad has to use same calculation that when calling
Expand Down Expand Up @@ -229,6 +239,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
}

private boolean shouldPersistLogPosition() {
if (stagedWalSize == 0 || lastShippedBatch == null) {
return false;
}
return (stagedWalSize >= source.getReplicationEndpoint().getMaxBufferSize())
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition checks if stagedWalSize is greater than or equal to getMaxBufferSize(), but getMaxBufferSize() can return -1 for non-buffering endpoints (as documented in ReplicationEndpoint). This means the comparison 'stagedWalSize >= -1' would always be true when stagedWalSize > 0, causing immediate flushes for non-buffering endpoints. While this preserves existing behavior, the intent is unclear and could be confusing. Consider explicitly checking for -1 to make the logic more explicit.

Suggested change
return (stagedWalSize >= source.getReplicationEndpoint().getMaxBufferSize())
long maxBufferSize = source.getReplicationEndpoint().getMaxBufferSize();
// For non-buffering endpoints, getMaxBufferSize() returns a negative value (e.g., -1).
// In that case, we always trigger a flush based on size as soon as there is staged data.
boolean sizeBasedFlush =
(maxBufferSize < 0) || (stagedWalSize >= maxBufferSize);
return sizeBasedFlush

Copilot uses AI. Check for mistakes.
|| (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs
>= source.getReplicationEndpoint().maxFlushInterval());
}

private void persistLogPosition() {
if (lastShippedBatch == null) {
return;
}
if (stagedWalSize > 0) {
source.getReplicationEndpoint().beforePersistingReplicationOffset();
}
stagedWalSize = 0;
lastStagedFlushTs = EnvironmentEdgeManager.currentTime();

// Clean up hfile references
for (Entry entry : entriesForCleanUpHFileRefs) {
try {
cleanUpHFileRefs(entry.getEdit());
} catch (IOException e) {
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling for IOException has been changed to catch and log the exception instead of propagating it. This silently suppresses IOException failures during cleanup, which could hide serious issues like file system problems. If cleanup failures should be non-fatal, this should be explicitly documented, or consider at least incrementing a failure metric to track these errors.

Suggested change
} catch (IOException e) {
} catch (IOException e) {
// Cleanup failures are intentionally treated as non-fatal: replication has already
// succeeded for these entries, so we log the failure and continue.

Copilot uses AI. Check for mistakes.
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), e);
}
Comment on lines +265 to +268
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: will this be a behavior change because previously when cleanUpHFileRefs failed, it's throwing thru the function but here we're logging it only.

LOG.trace("shipped entry {}: ", entry);
}
entriesForCleanUpHFileRefs.clear();

// Log and clean up WAL logs
updateLogPosition(lastShippedBatch);
}

private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import static org.junit.Assert.assertEquals;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

/**
* Tests staged WAL flush behavior in ReplicationSourceShipper.
*/
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationSourceShipperBufferedFlush {

static class CountingReplicationEndpoint extends BaseReplicationEndpoint {

private final AtomicInteger beforePersistCalls = new AtomicInteger();

@Override
public void start() {
startAsync().awaitRunning();
}

@Override
public void stop() {
stopAsync().awaitTerminated();
}

@Override
protected void doStart() {
notifyStarted();
}

@Override
protected void doStop() {
notifyStopped();
}

@Override
public boolean replicate(ReplicateContext ctx) {
return true;
}

@Override
public void beforePersistingReplicationOffset() {
beforePersistCalls.incrementAndGet();
}

@Override
public long getMaxBufferSize() {
return 1L; // force immediate flush
}

@Override
public long maxFlushInterval() {
return Long.MAX_VALUE;
}

@Override
public UUID getPeerUUID() {
return null;
}

int getBeforePersistCalls() {
return beforePersistCalls.get();
}
}

@Test
public void testBeforePersistNotCalledForEmptyBatch() throws Exception {
Configuration conf = HBaseConfiguration.create();

CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint();
endpoint.start();

ReplicationSource source = Mockito.mock(ReplicationSource.class);
ReplicationSourceWALReader walReader = Mockito.mock(ReplicationSourceWALReader.class);

Mockito.when(source.isPeerEnabled()).thenReturn(true);
Mockito.when(source.isSourceActive()).thenReturn(true);
Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint);
Mockito.when(source.getPeerId()).thenReturn("1");
Mockito.when(source.getSourceMetrics()).thenReturn(Mockito.mock(MetricsSource.class));

WALEntryBatch batch = new WALEntryBatch(1, null);
batch.setLastWalPath(new Path("wal"));
batch.setLastWalPosition(1L);
// no entries, no heap size

Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null);
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mock is configured to stub 'walReader.take()' but the ReplicationSourceShipper actually calls 'entryReader.poll(getEntriesTimeout)' (line 118 in ReplicationSourceShipper.java). This means the mock configuration has no effect, and the test will call the unmocked poll() method which will return null, causing the shipper to skip processing any batches. The test should mock 'poll(anyInt())' instead of 'take()'.

Suggested change
Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null);
Mockito.when(walReader.poll(Mockito.anyInt())).thenReturn(batch).thenReturn(null);

Copilot uses AI. Check for mistakes.

ReplicationSourceShipper shipper =
new ReplicationSourceShipper(conf, "wal-group", source, walReader);

shipper.start();

// Allow loop to run
Waiter.waitFor(conf, 3000, () -> true);

shipper.interrupt();
Comment on lines +123 to +126
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test uses a fixed wait time of 3000ms with an empty lambda that always returns true (Waiter.waitFor(conf, 3000, () -> true)). This just sleeps unconditionally and doesn't actually verify any condition. The test should wait for a meaningful condition, such as verifying that the shipper has processed the batch or checking that the thread has reached a specific state. This makes the test timing-dependent and unreliable.

Suggested change
// Allow loop to run
Waiter.waitFor(conf, 3000, () -> true);
shipper.interrupt();
// Wait until the shipper thread has finished processing the batch
Waiter.waitFor(conf, 3000, () -> !shipper.isAlive());

Copilot uses AI. Check for mistakes.
shipper.join();

assertEquals(0, endpoint.getBeforePersistCalls());
}
}
Loading