Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,17 @@ FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheC
return nullptr; /// Empty files are not cached.
}

VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
<< " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
<< " expiration_time=" << context.expiration_time
<< " tablet_id=" << context.tablet_id;

if (size > 1024 * 1024 * 1024) {
LOG(WARNING) << "File block size is too large for a block. size=" << size
<< " hash=" << hash.to_string() << " offset=" << offset
<< " stack:" << get_stack_trace();
}

auto& offsets = _files[hash];
auto itr = offsets.find(offset);
if (itr != offsets.end()) {
Expand Down Expand Up @@ -1393,10 +1404,10 @@ void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size
if (cell->queue_iterator) {
auto& queue = get_queue(cell->file_block->cache_type());
DCHECK(queue.contains(hash, offset, cache_lock));
auto iter = queue.get(hash, offset, cache_lock);
iter->size = new_size;
queue.cache_size -= old_size;
queue.cache_size += new_size;
queue.resize(*cell->queue_iterator, new_size, cache_lock);
_lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
cell->file_block->get_hash_value(),
cell->file_block->offset(), new_size);
}
_cur_cache_size -= old_size;
_cur_cache_size += new_size;
Expand Down Expand Up @@ -1691,6 +1702,13 @@ void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
queue.splice(queue.end(), queue, queue_it);
}

void LRUQueue::resize(Iterator queue_it, size_t new_size,
std::lock_guard<std::mutex>& /* cache_lock */) {
cache_size -= queue_it->size;
queue_it->size = new_size;
cache_size += new_size;
}
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
std::lock_guard<std::mutex>& /* cache_lock */) const {
return map.find(std::make_pair(hash, offset)) != map.end();
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/file_cache_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ class LRUQueue {

void move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& cache_lock);

void resize(Iterator queue_it, size_t new_size, std::lock_guard<std::mutex>& cache_lock);

std::string to_string(std::lock_guard<std::mutex>& cache_lock) const;

bool contains(const UInt128Wrapper& hash, size_t offset,
Expand Down
29 changes: 27 additions & 2 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,31 @@ Status FSFileCacheStorage::parse_filename_suffix_to_cache_type(
return Status::OK();
}

bool FSFileCacheStorage::handle_already_loaded_block(
BlockFileCache* mgr, const UInt128Wrapper& hash, size_t offset, size_t new_size,
int64_t tablet_id, std::lock_guard<std::mutex>& cache_lock) const {
auto file_it = mgr->_files.find(hash);
if (file_it == mgr->_files.end()) {
return false;
}

auto cell_it = file_it->second.find(offset);
if (cell_it == file_it->second.end()) {
return false;
}

auto block = cell_it->second.file_block;
if (tablet_id != 0 && block->tablet_id() == 0) {
block->set_tablet_id(tablet_id);
}

size_t old_size = block->range().size();
if (old_size != new_size) {
mgr->reset_range(hash, offset, old_size, new_size, cache_lock);
}
return true;
}

void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const {
int scan_length = 10000;
std::vector<BatchLoadArgs> batch_load_buffer;
Expand All @@ -662,8 +687,8 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const

auto f = [&](const BatchLoadArgs& args) {
// in async load mode, a cell may be added twice.
if (_mgr->_files.contains(args.hash) && _mgr->_files[args.hash].contains(args.offset)) {
// TODO(zhengyu): update type&expiration if need
if (handle_already_loaded_block(_mgr, args.hash, args.offset, args.size,
args.ctx.tablet_id, cache_lock)) {
return;
}
// if the file is tmp, it means it is the old file and it should be removed
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/cache/fs_file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class FSFileCacheStorage : public FileCacheStorage {

FileCacheStorageType get_type() override { return DISK; }

bool handle_already_loaded_block(BlockFileCache* mgr, const UInt128Wrapper& hash, size_t offset,
size_t new_size, int64_t tablet_id,
std::lock_guard<std::mutex>& cache_lock) const;

private:
void remove_old_version_directories();

Expand Down
18 changes: 18 additions & 0 deletions be/src/io/cache/lru_queue_recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) {
}
break;
}
case CacheLRULogType::RESIZE: {
auto it = shadow_queue.get(log->hash, log->offset, lru_log_lock);
if (it != std::list<LRUQueue::FileKeyAndOffset>::iterator()) {
shadow_queue.resize(it, log->size, lru_log_lock);
} else {
LOG(WARNING) << "RESIZE failed, doesn't exist in shadow queue";
}
break;
}
case CacheLRULogType::RESIZE: {
auto it = shadow_queue.get(log->hash, log->offset, lru_log_lock);
if (it != std::list<LRUQueue::FileKeyAndOffset>::iterator()) {
shadow_queue.resize(it, log->size, lru_log_lock);
} else {
LOG(WARNING) << "RESIZE failed, doesn't exist in shadow queue";
}
break;
}
default:
LOG(WARNING) << "Unknown CacheLRULogType: " << static_cast<int>(log->type);
break;
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/lru_queue_recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ enum class CacheLRULogType {
ADD = 0, // all of the integer types
REMOVE = 1,
MOVETOBACK = 2,
INVALID = 3,
RESIZE = 3,
INVALID = 4,
};

struct CacheLRULog {
Expand Down