Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
Expand Down Expand Up @@ -229,8 +228,6 @@ protected enum CacheState {
/** Cache access count (sequential ID) */
private final AtomicLong accessCount = new AtomicLong();

private static final int DEFAULT_CACHE_WAIT_TIME = 50;

private final BucketCacheStats cacheStats;
private final String persistencePath;
static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
Expand Down Expand Up @@ -2385,34 +2382,34 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}",
fileName, totalBlockCount, dataBlockCount);
try {
final MutableInt count = new MutableInt();
int count = 0;
LOG.debug("iterating over {} entries in the backing map", backingMap.size());
Set<BlockCacheKey> result = getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE);
if (result.isEmpty() && StoreFileInfo.isReference(fileName)) {
result = getAllCacheKeysForFile(
StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), 0,
Long.MAX_VALUE);
}
result.stream().forEach(entry -> {
for (BlockCacheKey entry : result) {
LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}",
fileName.getName(), entry.getOffset());
ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset());
lock.readLock().lock();
locks.add(lock);
if (backingMap.containsKey(entry) && entry.getBlockType().isData()) {
count.increment();
count++;
}
});
}
// BucketCache would only have data blocks
if (dataBlockCount == count.getValue()) {
if (dataBlockCount == count) {
LOG.debug("File {} has now been fully cached.", fileName);
fileCacheCompleted(fileName, size);
} else {
LOG.debug(
"Prefetch executor completed for {}, but only {} data blocks were cached. "
+ "Total data blocks for file: {}. "
+ "Checking for blocks pending cache in cache writer queue.",
fileName, count.getValue(), dataBlockCount);
fileName, count, dataBlockCount);
if (ramCache.hasBlocksForFile(fileName.getName())) {
for (ReentrantReadWriteLock lock : locks) {
lock.readLock().unlock();
Expand Down Expand Up @@ -2507,16 +2504,19 @@ boolean isCacheInitialized(String api) {

@Override
public boolean waitForCacheInitialization(long timeout) {
try {
while (cacheState == CacheState.INITIALIZING) {
if (timeout <= 0) {
break;
}
while (cacheState == CacheState.INITIALIZING) {
if (timeout <= 0) {
break;
}
try {
Thread.sleep(100);
timeout -= 100;
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for cache initialization", e);
Thread.currentThread().interrupt();
break;
}
} finally {
return isCacheEnabled();
timeout -= 100;
}
return isCacheEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -138,7 +137,8 @@ public void testRetrieveFromFile() throws Exception {
recoveredBucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
waitPersistentCacheValidation(conf, bucketCache);
assertTrue(recoveredBucketCache.waitForCacheInitialization(10000));
waitPersistentCacheValidation(conf, recoveredBucketCache);
Copy link
Contributor

@mnpoonia mnpoonia Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the main fix. We were using a bucketCache which is deleted instead of the recoveredBucketCache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just a typo... We should recoveredBucketCache instead of bucketCache here...

assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
assertEquals(0, recoveredBucketCache.backingMap.size());
BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
Expand Down Expand Up @@ -308,8 +308,6 @@ public void testModifiedBucketCacheFileTime() throws Exception {
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);

Pair<String, Long> myPair = new Pair<>();

CacheTestUtils.HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
// Add blocks
Expand Down