diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index bd55a2b76d0514..30817cee0bbb22 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -939,6 +939,17 @@ FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheC return nullptr; /// Empty files are not cached. } + VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string() + << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type) + << " expiration_time=" << context.expiration_time + << " tablet_id=" << context.tablet_id; + + if (size > 1024 * 1024 * 1024) { + LOG(WARNING) << "File block size is too large for a block. size=" << size + << " hash=" << hash.to_string() << " offset=" << offset + << " stack:" << get_stack_trace(); + } + auto& offsets = _files[hash]; auto itr = offsets.find(offset); if (itr != offsets.end()) { @@ -1393,10 +1404,10 @@ void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size if (cell->queue_iterator) { auto& queue = get_queue(cell->file_block->cache_type()); DCHECK(queue.contains(hash, offset, cache_lock)); - auto iter = queue.get(hash, offset, cache_lock); - iter->size = new_size; - queue.cache_size -= old_size; - queue.cache_size += new_size; + queue.resize(*cell->queue_iterator, new_size, cache_lock); + _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE, + cell->file_block->get_hash_value(), + cell->file_block->offset(), new_size); } _cur_cache_size -= old_size; _cur_cache_size += new_size; @@ -1691,6 +1702,13 @@ void LRUQueue::remove_all(std::lock_guard& /* cache_lock */) { void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard& /* cache_lock */) { queue.splice(queue.end(), queue, queue_it); } + +void LRUQueue::resize(Iterator queue_it, size_t new_size, + std::lock_guard& /* cache_lock */) { + cache_size -= queue_it->size; + queue_it->size = new_size; + cache_size += new_size; +} bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset, std::lock_guard& /* cache_lock */) const { return map.find(std::make_pair(hash, offset)) != map.end(); diff --git a/be/src/io/cache/file_cache_common.h b/be/src/io/cache/file_cache_common.h index 090c219236b59d..417f68ecc978dc 100644 --- a/be/src/io/cache/file_cache_common.h +++ b/be/src/io/cache/file_cache_common.h @@ -224,6 +224,8 @@ class LRUQueue { void move_to_end(Iterator queue_it, std::lock_guard& cache_lock); + void resize(Iterator queue_it, size_t new_size, std::lock_guard& cache_lock); + std::string to_string(std::lock_guard& cache_lock) const; bool contains(const UInt128Wrapper& hash, size_t offset, diff --git a/be/src/io/cache/fs_file_cache_storage.cpp b/be/src/io/cache/fs_file_cache_storage.cpp index 3df56973af7149..87db5bca861994 100644 --- a/be/src/io/cache/fs_file_cache_storage.cpp +++ b/be/src/io/cache/fs_file_cache_storage.cpp @@ -653,6 +653,31 @@ Status FSFileCacheStorage::parse_filename_suffix_to_cache_type( return Status::OK(); } +bool FSFileCacheStorage::handle_already_loaded_block( + BlockFileCache* mgr, const UInt128Wrapper& hash, size_t offset, size_t new_size, + int64_t tablet_id, std::lock_guard& cache_lock) const { + auto file_it = mgr->_files.find(hash); + if (file_it == mgr->_files.end()) { + return false; + } + + auto cell_it = file_it->second.find(offset); + if (cell_it == file_it->second.end()) { + return false; + } + + auto block = cell_it->second.file_block; + if (tablet_id != 0 && block->tablet_id() == 0) { + block->set_tablet_id(tablet_id); + } + + size_t old_size = block->range().size(); + if (old_size != new_size) { + mgr->reset_range(hash, offset, old_size, new_size, cache_lock); + } + return true; +} + void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const { int scan_length = 10000; std::vector batch_load_buffer; @@ -662,8 +687,8 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const auto f = [&](const BatchLoadArgs& args) { // in async load mode, a cell may be added twice. - if (_mgr->_files.contains(args.hash) && _mgr->_files[args.hash].contains(args.offset)) { - // TODO(zhengyu): update type&expiration if need + if (handle_already_loaded_block(_mgr, args.hash, args.offset, args.size, + args.ctx.tablet_id, cache_lock)) { return; } // if the file is tmp, it means it is the old file and it should be removed diff --git a/be/src/io/cache/fs_file_cache_storage.h b/be/src/io/cache/fs_file_cache_storage.h index 114517bdf72706..21a782d7e86a98 100644 --- a/be/src/io/cache/fs_file_cache_storage.h +++ b/be/src/io/cache/fs_file_cache_storage.h @@ -88,6 +88,10 @@ class FSFileCacheStorage : public FileCacheStorage { FileCacheStorageType get_type() override { return DISK; } + bool handle_already_loaded_block(BlockFileCache* mgr, const UInt128Wrapper& hash, size_t offset, + size_t new_size, int64_t tablet_id, + std::lock_guard& cache_lock) const; + private: void remove_old_version_directories(); diff --git a/be/src/io/cache/lru_queue_recorder.cpp b/be/src/io/cache/lru_queue_recorder.cpp index 8308a2a73ad6e3..587b1f840d589b 100644 --- a/be/src/io/cache/lru_queue_recorder.cpp +++ b/be/src/io/cache/lru_queue_recorder.cpp @@ -62,6 +62,24 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) { } break; } + case CacheLRULogType::RESIZE: { + auto it = shadow_queue.get(log->hash, log->offset, lru_log_lock); + if (it != std::list::iterator()) { + shadow_queue.resize(it, log->size, lru_log_lock); + } else { + LOG(WARNING) << "RESIZE failed, doesn't exist in shadow queue"; + } + break; + } + case CacheLRULogType::RESIZE: { + auto it = shadow_queue.get(log->hash, log->offset, lru_log_lock); + if (it != std::list::iterator()) { + shadow_queue.resize(it, log->size, lru_log_lock); + } else { + LOG(WARNING) << "RESIZE failed, doesn't exist in shadow queue"; + } + break; + } default: LOG(WARNING) << "Unknown CacheLRULogType: " << static_cast(log->type); break; diff --git a/be/src/io/cache/lru_queue_recorder.h b/be/src/io/cache/lru_queue_recorder.h index 1f6d69493cf4a8..5bd68b70d555f9 100644 --- a/be/src/io/cache/lru_queue_recorder.h +++ b/be/src/io/cache/lru_queue_recorder.h @@ -31,7 +31,8 @@ enum class CacheLRULogType { ADD = 0, // all of the integer types REMOVE = 1, MOVETOBACK = 2, - INVALID = 3, + RESIZE = 3, + INVALID = 4, }; struct CacheLRULog {