-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29823 Control WAL flush and offset persistence from ReplicationSourceShipper #7617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -283,4 +283,29 @@ public int getTimeout() { | |
| * @throws IllegalStateException if this service's state isn't FAILED. | ||
| */ | ||
| Throwable failureCause(); | ||
|
|
||
| // WAL entries are buffered in ContinuousBackupReplicationEndpoint before flushing to WAL backup | ||
| // file. So we return config value CONF_BACKUP_MAX_WAL_SIZE for | ||
| // ContinuousBackupReplicationEndpoint | ||
| // and -1 for other ReplicationEndpoint since they don't buffer. | ||
| // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication | ||
| // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal() | ||
| default long getMaxBufferSize() { | ||
| return -1; | ||
| } | ||
|
|
||
| // WAL entries are buffered in ContinuousBackupReplicationEndpoint before flushing to WAL backup | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
| // file. So we return config value CONF_STAGED_WAL_FLUSH_INTERVAL for | ||
| // ContinuousBackupReplicationEndpoint | ||
| // and Long.MAX_VALUE for other ReplicationEndpoint since they don't buffer. | ||
| // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication | ||
| // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal() | ||
|
||
| default long maxFlushInterval() { | ||
| return Long.MAX_VALUE; | ||
| } | ||
|
|
||
| // Used in ContinuousBackupReplicationEndpoint to flush/close WAL backup files | ||
| default void beforePersistingReplicationOffset() { | ||
|
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |||||||||||||||
| import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; | ||||||||||||||||
|
|
||||||||||||||||
| import java.io.IOException; | ||||||||||||||||
| import java.util.ArrayList; | ||||||||||||||||
| import java.util.List; | ||||||||||||||||
| import org.apache.hadoop.conf.Configuration; | ||||||||||||||||
| import org.apache.hadoop.fs.Path; | ||||||||||||||||
|
|
@@ -74,6 +75,10 @@ public enum WorkerState { | |||||||||||||||
| private final int DEFAULT_TIMEOUT = 20000; | ||||||||||||||||
| private final int getEntriesTimeout; | ||||||||||||||||
| private final int shipEditsTimeout; | ||||||||||||||||
| private long stagedWalSize = 0L; | ||||||||||||||||
| private long lastStagedFlushTs = EnvironmentEdgeManager.currentTime(); | ||||||||||||||||
| private WALEntryBatch lastShippedBatch; | ||||||||||||||||
| private final List<Entry> entriesForCleanUpHFileRefs = new ArrayList<>(); | ||||||||||||||||
|
|
||||||||||||||||
| public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, | ||||||||||||||||
| ReplicationSourceWALReader walReader) { | ||||||||||||||||
|
|
@@ -98,6 +103,10 @@ public final void run() { | |||||||||||||||
| LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); | ||||||||||||||||
| // Loop until we close down | ||||||||||||||||
| while (isActive()) { | ||||||||||||||||
| // check if flush needed for WAL backup, this is need for timeout based flush | ||||||||||||||||
|
||||||||||||||||
| // check if flush needed for WAL backup, this is need for timeout based flush | |
| // check if flush needed for WAL backup, this is needed for timeout based flush |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition checks if stagedWalSize is greater than or equal to getMaxBufferSize(), but getMaxBufferSize() can return -1 for non-buffering endpoints (as documented in ReplicationEndpoint). This means the comparison 'stagedWalSize >= -1' would always be true when stagedWalSize > 0, causing immediate flushes for non-buffering endpoints. While this preserves existing behavior, the intent is unclear and could be confusing. Consider explicitly checking for -1 to make the logic more explicit.
| return (stagedWalSize >= source.getReplicationEndpoint().getMaxBufferSize()) | |
| long maxBufferSize = source.getReplicationEndpoint().getMaxBufferSize(); | |
| // For non-buffering endpoints, getMaxBufferSize() returns a negative value (e.g., -1). | |
| // In that case, we always trigger a flush based on size as soon as there is staged data. | |
| boolean sizeBasedFlush = | |
| (maxBufferSize < 0) || (stagedWalSize >= maxBufferSize); | |
| return sizeBasedFlush |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling for IOException has been changed to catch and log the exception instead of propagating it. This silently suppresses IOException failures during cleanup, which could hide serious issues like file system problems. If cleanup failures should be non-fatal, this should be explicitly documented, or consider at least incrementing a failure metric to track these errors.
| } catch (IOException e) { | |
| } catch (IOException e) { | |
| // Cleanup failures are intentionally treated as non-fatal: replication has already | |
| // succeeded for these entries, so we log the failure and continue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: will this be a behavior change because previously when cleanUpHFileRefs failed, it's throwing thru the function but here we're logging it only.
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,131 @@ | ||||||||||||||
| /* | ||||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one | ||||||||||||||
| * or more contributor license agreements. See the NOTICE file | ||||||||||||||
| * distributed with this work for additional information | ||||||||||||||
| * regarding copyright ownership. The ASF licenses this file | ||||||||||||||
| * to you under the Apache License, Version 2.0 (the | ||||||||||||||
| * "License"); you may not use this file except in compliance | ||||||||||||||
| * with the License. You may obtain a copy of the License at | ||||||||||||||
| * | ||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||
| * | ||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||
| * limitations under the License. | ||||||||||||||
| */ | ||||||||||||||
| package org.apache.hadoop.hbase.replication.regionserver; | ||||||||||||||
|
|
||||||||||||||
| import static org.junit.Assert.assertEquals; | ||||||||||||||
|
|
||||||||||||||
| import java.util.UUID; | ||||||||||||||
| import java.util.concurrent.atomic.AtomicInteger; | ||||||||||||||
| import org.apache.hadoop.conf.Configuration; | ||||||||||||||
| import org.apache.hadoop.fs.Path; | ||||||||||||||
| import org.apache.hadoop.hbase.HBaseConfiguration; | ||||||||||||||
| import org.apache.hadoop.hbase.Waiter; | ||||||||||||||
| import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; | ||||||||||||||
| import org.apache.hadoop.hbase.testclassification.MediumTests; | ||||||||||||||
| import org.apache.hadoop.hbase.testclassification.ReplicationTests; | ||||||||||||||
| import org.junit.Test; | ||||||||||||||
| import org.junit.experimental.categories.Category; | ||||||||||||||
| import org.mockito.Mockito; | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
| * Tests staged WAL flush behavior in ReplicationSourceShipper. | ||||||||||||||
| */ | ||||||||||||||
| @Category({ ReplicationTests.class, MediumTests.class }) | ||||||||||||||
| public class TestReplicationSourceShipperBufferedFlush { | ||||||||||||||
|
|
||||||||||||||
| static class CountingReplicationEndpoint extends BaseReplicationEndpoint { | ||||||||||||||
|
|
||||||||||||||
| private final AtomicInteger beforePersistCalls = new AtomicInteger(); | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public void start() { | ||||||||||||||
| startAsync().awaitRunning(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public void stop() { | ||||||||||||||
| stopAsync().awaitTerminated(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| protected void doStart() { | ||||||||||||||
| notifyStarted(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| protected void doStop() { | ||||||||||||||
| notifyStopped(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public boolean replicate(ReplicateContext ctx) { | ||||||||||||||
| return true; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public void beforePersistingReplicationOffset() { | ||||||||||||||
| beforePersistCalls.incrementAndGet(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public long getMaxBufferSize() { | ||||||||||||||
| return 1L; // force immediate flush | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public long maxFlushInterval() { | ||||||||||||||
| return Long.MAX_VALUE; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public UUID getPeerUUID() { | ||||||||||||||
| return null; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| int getBeforePersistCalls() { | ||||||||||||||
| return beforePersistCalls.get(); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Test | ||||||||||||||
| public void testBeforePersistNotCalledForEmptyBatch() throws Exception { | ||||||||||||||
| Configuration conf = HBaseConfiguration.create(); | ||||||||||||||
|
|
||||||||||||||
| CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint(); | ||||||||||||||
| endpoint.start(); | ||||||||||||||
|
|
||||||||||||||
| ReplicationSource source = Mockito.mock(ReplicationSource.class); | ||||||||||||||
| ReplicationSourceWALReader walReader = Mockito.mock(ReplicationSourceWALReader.class); | ||||||||||||||
|
|
||||||||||||||
| Mockito.when(source.isPeerEnabled()).thenReturn(true); | ||||||||||||||
| Mockito.when(source.isSourceActive()).thenReturn(true); | ||||||||||||||
| Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint); | ||||||||||||||
| Mockito.when(source.getPeerId()).thenReturn("1"); | ||||||||||||||
| Mockito.when(source.getSourceMetrics()).thenReturn(Mockito.mock(MetricsSource.class)); | ||||||||||||||
|
|
||||||||||||||
| WALEntryBatch batch = new WALEntryBatch(1, null); | ||||||||||||||
| batch.setLastWalPath(new Path("wal")); | ||||||||||||||
| batch.setLastWalPosition(1L); | ||||||||||||||
| // no entries, no heap size | ||||||||||||||
|
|
||||||||||||||
| Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null); | ||||||||||||||
|
||||||||||||||
| Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null); | |
| Mockito.when(walReader.poll(Mockito.anyInt())).thenReturn(batch).thenReturn(null); |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test uses a fixed wait time of 3000ms with an empty lambda that always returns true (Waiter.waitFor(conf, 3000, () -> true)). This just sleeps unconditionally and doesn't actually verify any condition. The test should wait for a meaningful condition, such as verifying that the shipper has processed the batch or checking that the thread has reached a specific state. This makes the test timing-dependent and unreliable.
| // Allow loop to run | |
| Waiter.waitFor(conf, 3000, () -> true); | |
| shipper.interrupt(); | |
| // Wait until the shipper thread has finished processing the batch | |
| Waiter.waitFor(conf, 3000, () -> !shipper.isAlive()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment references 'shouldFlushStagedWal()' but the actual method name in ReplicationSourceShipper is 'shouldPersistLogPosition()'. This inconsistency will confuse developers trying to understand the interaction between these components.