diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 5edd5b3e8c92..1032c7b82196 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -283,4 +283,29 @@ public int getTimeout() { * @throws IllegalStateException if this service's state isn't FAILED. */ Throwable failureCause(); + + // WAL entries are buffered in ContinuousBackupReplicationEndpoint before flushing to WAL backup + // file. So we return config value CONF_BACKUP_MAX_WAL_SIZE for + // ContinuousBackupReplicationEndpoint + // and -1 for other ReplicationEndpoint since they don't buffer. + // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication + // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal() + default long getMaxBufferSize() { + return -1; + } + + // WAL entries are buffered in ContinuousBackupReplicationEndpoint before flushing to WAL backup + // file. So we return config value CONF_STAGED_WAL_FLUSH_INTERVAL for + // ContinuousBackupReplicationEndpoint + // and Long.MAX_VALUE for other ReplicationEndpoint since they don't buffer. + // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication + // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal() + default long maxFlushInterval() { + return Long.MAX_VALUE; + } + + // Used in ContinuousBackupReplicationEndpoint to flush/close WAL backup files + default void beforePersistingReplicationOffset() { + + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 4709e607fc70..fe14c992a974 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -74,6 +75,10 @@ public enum WorkerState { private final int DEFAULT_TIMEOUT = 20000; private final int getEntriesTimeout; private final int shipEditsTimeout; + private long stagedWalSize = 0L; + private long lastStagedFlushTs = EnvironmentEdgeManager.currentTime(); + private WALEntryBatch lastShippedBatch; + private final List entriesForCleanUpHFileRefs = new ArrayList<>(); public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, ReplicationSourceWALReader walReader) { @@ -98,6 +103,10 @@ public final void run() { LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); // Loop until we close down while (isActive()) { + // check if flush needed for WAL backup, this is need for timeout based flush + if (shouldPersistLogPosition()) { + persistLogPosition(); + } // Sleep until replication is enabled again if (!source.isPeerEnabled()) { // The peer enabled check is in memory, not expensive, so do not need to increase the @@ -155,7 +164,8 @@ private void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { - updateLogPosition(entryBatch); + lastShippedBatch = entryBatch; + persistLogPosition(); return; } int currentSize = (int) entryBatch.getHeapSize(); @@ -190,13 +200,13 @@ private void shipEdits(WALEntryBatch entryBatch) { } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - // Clean up hfile references - for (Entry entry : entries) { - cleanUpHFileRefs(entry.getEdit()); - LOG.trace("shipped entry {}: ", entry); + + stagedWalSize += currentSize; + entriesForCleanUpHFileRefs.addAll(entries); + lastShippedBatch = entryBatch; + if (shouldPersistLogPosition()) { + persistLogPosition(); } - // Log and clean up WAL logs - updateLogPosition(entryBatch); // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) // this sizeExcludeBulkLoad has to use same calculation that when calling @@ -229,6 +239,41 @@ private void shipEdits(WALEntryBatch entryBatch) { } } + private boolean shouldPersistLogPosition() { + if (stagedWalSize == 0 || lastShippedBatch == null) { + return false; + } + return (stagedWalSize >= source.getReplicationEndpoint().getMaxBufferSize()) + || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs + >= source.getReplicationEndpoint().maxFlushInterval()); + } + + private void persistLogPosition() { + if (lastShippedBatch == null) { + return; + } + if (stagedWalSize > 0) { + source.getReplicationEndpoint().beforePersistingReplicationOffset(); + } + stagedWalSize = 0; + lastStagedFlushTs = EnvironmentEdgeManager.currentTime(); + + // Clean up hfile references + for (Entry entry : entriesForCleanUpHFileRefs) { + try { + cleanUpHFileRefs(entry.getEdit()); + } catch (IOException e) { + LOG.warn("{} threw unknown exception:", + source.getReplicationEndpoint().getClass().getName(), e); + } + LOG.trace("shipped entry {}: ", entry); + } + entriesForCleanUpHFileRefs.clear(); + + // Log and clean up WAL logs + updateLogPosition(lastShippedBatch); + } + private void cleanUpHFileRefs(WALEdit edit) throws IOException { String peerId = source.getPeerId(); if (peerId.contains("-")) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceShipperBufferedFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceShipperBufferedFlush.java new file mode 100644 index 000000000000..66480f8dacb8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceShipperBufferedFlush.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +/** + * Tests staged WAL flush behavior in ReplicationSourceShipper. + */ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationSourceShipperBufferedFlush { + + static class CountingReplicationEndpoint extends BaseReplicationEndpoint { + + private final AtomicInteger beforePersistCalls = new AtomicInteger(); + + @Override + public void start() { + startAsync().awaitRunning(); + } + + @Override + public void stop() { + stopAsync().awaitTerminated(); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + + @Override + public boolean replicate(ReplicateContext ctx) { + return true; + } + + @Override + public void beforePersistingReplicationOffset() { + beforePersistCalls.incrementAndGet(); + } + + @Override + public long getMaxBufferSize() { + return 1L; // force immediate flush + } + + @Override + public long maxFlushInterval() { + return Long.MAX_VALUE; + } + + @Override + public UUID getPeerUUID() { + return null; + } + + int getBeforePersistCalls() { + return beforePersistCalls.get(); + } + } + + @Test + public void testBeforePersistNotCalledForEmptyBatch() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint(); + endpoint.start(); + + ReplicationSource source = Mockito.mock(ReplicationSource.class); + ReplicationSourceWALReader walReader = Mockito.mock(ReplicationSourceWALReader.class); + + Mockito.when(source.isPeerEnabled()).thenReturn(true); + Mockito.when(source.isSourceActive()).thenReturn(true); + Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint); + Mockito.when(source.getPeerId()).thenReturn("1"); + Mockito.when(source.getSourceMetrics()).thenReturn(Mockito.mock(MetricsSource.class)); + + WALEntryBatch batch = new WALEntryBatch(1, null); + batch.setLastWalPath(new Path("wal")); + batch.setLastWalPosition(1L); + // no entries, no heap size + + Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null); + + ReplicationSourceShipper shipper = + new ReplicationSourceShipper(conf, "wal-group", source, walReader); + + shipper.start(); + + // Allow loop to run + Waiter.waitFor(conf, 3000, () -> true); + + shipper.interrupt(); + shipper.join(); + + assertEquals(0, endpoint.getBeforePersistCalls()); + } +}