Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state,
.mtime = fd.mtime,
};
if (config::enable_file_cache && state != nullptr &&
state->query_options().__isset.enable_file_cache &&
state->query_options().enable_file_cache) {
state->query_options().__isset.enable_file_cache_external_catalog &&
state->query_options().enable_file_cache_external_catalog) {
opts.cache_type = io::FileCachePolicy::FILE_BLOCK_CACHE;
}
if (state != nullptr && state->query_options().__isset.file_cache_base_path &&
Expand Down
4 changes: 1 addition & 3 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,8 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.target_cast_type_for_variants = _read_context->target_cast_type_for_variants;
if (_read_context->runtime_state != nullptr) {
_read_options.io_ctx.query_id = &_read_context->runtime_state->query_id();
_read_options.io_ctx.read_file_cache =
_read_context->runtime_state->query_options().enable_file_cache;
_read_options.io_ctx.is_disposable =
_read_context->runtime_state->query_options().disable_file_cache;
!_read_context->runtime_state->query_options().enable_file_cache_olap_table;
}

if (_read_context->condition_cache_digest) {
Expand Down
11 changes: 6 additions & 5 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,12 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,

_timeout_second = query_options.execution_timeout;

bool initialize_context_holder =
config::enable_file_cache && config::enable_file_cache_query_limit &&
query_options.__isset.enable_file_cache && query_options.enable_file_cache &&
query_options.__isset.file_cache_query_limit_percent &&
query_options.file_cache_query_limit_percent < 100;
bool initialize_context_holder = config::enable_file_cache &&
config::enable_file_cache_query_limit &&
!(query_options.query_type == TQueryType::EXTERNAL &&
!query_options.enable_file_cache_external_catalog) &&
query_options.__isset.file_cache_query_limit_percent &&
query_options.file_cache_query_limit_percent < 100;

// Initialize file cache context holders
if (initialize_context_holder) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts
RETURN_IF_ERROR(_init_io_ctx());
_io_ctx->file_cache_stats = _file_cache_statistics.get();
_io_ctx->file_reader_stats = _file_reader_stats.get();
_io_ctx->is_disposable = _state->query_options().disable_file_cache;
_io_ctx->is_disposable = !_state->query_options().enable_file_cache_external_catalog;

if (_is_load) {
_src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
Expand Down Expand Up @@ -1808,7 +1808,7 @@ void FileScanner::update_realtime_counters() {

void FileScanner::_collect_profile_before_close() {
Scanner::_collect_profile_before_close();
if (config::enable_file_cache && _state->query_options().enable_file_cache &&
if (config::enable_file_cache && _state->query_options().enable_file_cache_external_catalog &&
_profile != nullptr) {
io::FileCacheProfileReporter cache_profile(_profile);
cache_profile.update(_file_cache_statistics.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public abstract class ExternalScanNode extends ScanNode {
protected boolean needCheckColumnPriv;

protected final FederationBackendPolicy backendPolicy = (ConnectContext.get() != null
&& (ConnectContext.get().getSessionVariable().enableFileCache
&& (ConnectContext.get().getSessionVariable().enableFileCacheExternalCatalog
|| ConnectContext.get().getSessionVariable().getUseConsistentHashForExternalScan()))
? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING)
: new FederationBackendPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9134,14 +9134,14 @@ public LogicalPlan visitWarmUpSelect(DorisParser.WarmUpSelectContext ctx) {

LogicalProject project = new LogicalProject(projectList, filter);

if (Config.isNotCloudMode() && (!ConnectContext.get().getSessionVariable().isEnableFileCache())) {
if (Config.isNotCloudMode() && !ConnectContext.get().getSessionVariable().isEnableFileCacheExternalCatalog()) {
throw new AnalysisException("WARM UP SELECT requires session variable"
+ " enable_file_cache=true");
+ " enable_file_cache_external_catalog=true");
}

if (Config.isCloudMode() && ConnectContext.get().getSessionVariable().isDisableFileCache()) {
if (Config.isCloudMode() && !ConnectContext.get().getSessionVariable().isEnableFileCacheOlapTable()) {
throw new AnalysisException("WARM UP SELECT requires session variable"
+ " disable_file_cache=false in cloud mode");
+ " enable_file_cache_olap_table=true in cloud mode");
}

UnboundBlackholeSink<?> sink = new UnboundBlackholeSink<>(project,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeou
tSink.setLoadChannelTimeoutS(loadChannelTimeoutS);
tSink.setSendBatchParallelism(sendBatchParallelism);
tSink.setWriteFileCache(ConnectContext.get() != null
? !ConnectContext.get().getSessionVariable().isDisableFileCache()
? ConnectContext.get().getSessionVariable().isEnableFileCacheOlapTable()
: false);
this.isStrictMode = isStrictMode;
this.txnId = txnId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public NereidsCoordinator(Long jobId, TUniqueId queryId, DescriptorTable descTab
descTable, timezone, loadZeroTolerance, enableProfile
);
// same reason in `setForInsert`
this.coordinatorContext.queryOptions.setDisableFileCache(true);
this.coordinatorContext.queryOptions.setEnableFileCacheOlapTable(false);
this.needEnqueue = false;

Preconditions.checkState(!fragments.isEmpty()
Expand Down Expand Up @@ -490,7 +490,7 @@ private void setForInsert(long jobId) {
JobProcessor jobProc = new LoadProcessor(this.coordinatorContext, jobId);
this.coordinatorContext.setJobProcessor(jobProc);
// Set this field to true to avoid data entering the normal cache LRU queue
this.coordinatorContext.queryOptions.setDisableFileCache(true);
this.coordinatorContext.queryOptions.setEnableFileCacheOlapTable(false);
this.coordinatorContext.queryOptions.setNewVersionUnixTimestamp(true);
}

Expand Down
47 changes: 42 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,15 @@ public class SessionVariable implements Serializable, Writable {
public static final String TOPN_FILTER_RATIO = "topn_filter_ratio";
public static final String ENABLE_SNAPSHOT_POINT_QUERY = "enable_snapshot_point_query";

// deprecated
public static final String ENABLE_FILE_CACHE = "enable_file_cache";

// deprecated
public static final String DISABLE_FILE_CACHE = "disable_file_cache";

public static final String ENABLE_FILE_CACHE_OLAP_TABLE = "enable_file_cache_olap_table";
public static final String ENABLE_FILE_CACHE_EXTERNAL_CATALOG = "enable_file_cache_external_catalog";

public static final String FILE_CACHE_QUERY_LIMIT_PERCENT = "file_cache_query_limit_percent";

public static final String FILE_CACHE_BASE_PATH = "file_cache_base_path";
Expand Down Expand Up @@ -2171,17 +2176,33 @@ public boolean isEnableHboNonStrictMatchingMode() {

// Whether disable block file cache. Block cache only works when FE's query options sets disableFileCache false
// along with BE's config `enable_file_cache` true
@VariableMgr.VarAttr(name = DISABLE_FILE_CACHE, needForward = true)
@VariableMgr.VarAttr(name = DISABLE_FILE_CACHE, needForward = true, varType = VariableAnnotation.DEPRECATED)
public boolean disableFileCache = false;

// Whether enable block file cache. Only take effect when BE config item enable_file_cache is true.
@VariableMgr.VarAttr(name = ENABLE_FILE_CACHE, needForward = true, description = {
"是否启用 file cache。该变量只有在 be.conf 中 enable_file_cache=true 时才有效,"
@VariableMgr.VarAttr(name = ENABLE_FILE_CACHE, needForward = true, varType = VariableAnnotation.DEPRECATED,
description = {"是否启用 file cache。该变量只有在 be.conf 中 enable_file_cache=true 时才有效,"
+ "如果 be.conf 中 enable_file_cache=false,该 BE 节点的 file cache 处于禁用状态。",
"Set wether to use file cache. This variable takes effect only if the BE config enable_file_cache=true. "
+ "The cache is not used when BE config enable_file_cache=false."})
"Set whether to use file cache. This variable takes effect only if the BE config "
+ "enable_file_cache=true. The cache is not used when BE config enable_file_cache=false."})
public boolean enableFileCache = false;

@VariableMgr.VarAttr(name = ENABLE_FILE_CACHE_OLAP_TABLE, needForward = true, description = {
"是否在存算分离场景下启用 file cache。该变量只有在 be.conf 中 enable_file_cache=true 时才有效,"
+ "如果 be.conf 中 enable_file_cache=false,该 BE 节点的 file cache 处于禁用状态。",
"Set whether to enable file cache for OLAP tables in cloud mode. "
+ "This variable takes effect only if the BE config enable_file_cache=true. "
+ "The cache is not used when BE config enable_file_cache=false."})
public boolean enableFileCacheOlapTable = true;

@VariableMgr.VarAttr(name = ENABLE_FILE_CACHE_EXTERNAL_CATALOG, needForward = true, description = {
"是否在湖仓一体场景下启用 file cache。该变量只有在 be.conf 中 enable_file_cache=true 时才有效,"
+ "如果 be.conf 中 enable_file_cache=false,该 BE 节点的 file cache 处于禁用状态。",
"Set whether to enable file cache for external catalogs in lakehouse scenarios. "
+ "This variable takes effect only if the BE config enable_file_cache=true. "
+ "The cache is not used when BE config enable_file_cache=false."})
public boolean enableFileCacheExternalCatalog = false;

// Specify base path for file cache, or chose a random path.
@VariableMgr.VarAttr(name = FILE_CACHE_BASE_PATH, needForward = true, description = {
"指定 block file cache 在 BE 上的存储路径,默认 'random',随机选择 BE 配置的存储路径。",
Expand Down Expand Up @@ -4971,6 +4992,22 @@ public void setEnableFileCache(boolean enableFileCache) {
this.enableFileCache = enableFileCache;
}

public boolean isEnableFileCacheOlapTable() {
return enableFileCacheOlapTable;
}

public void setEnableFileCacheOlapTable(boolean enableFileCacheOlapTable) {
this.enableFileCacheOlapTable = enableFileCacheOlapTable;
}

public boolean isEnableFileCacheExternalCatalog() {
return enableFileCacheExternalCatalog;
}

public void setEnableFileCacheExternalCatalog(boolean enableFileCacheExternalCatalog) {
this.enableFileCacheExternalCatalog = enableFileCacheExternalCatalog;
}

public String getFileCacheBasePath() {
return fileCacheBasePath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public static AutoCloseConnectContext buildConnectContext(boolean useFileCacheFo
LOG.warn("failed to connect to cloud cluster", e);
return ctx;
}
sessionVariable.disableFileCache = !useFileCacheForStat;
sessionVariable.enableFileCacheOlapTable = useFileCacheForStat;
return ctx;
} else {
return new AutoCloseConnectContext(connectContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1393,8 +1393,8 @@ public void testAdminRotateTdeRootKey() {
@Test
public void testWarmUpSelect() {
ConnectContext ctx = ConnectContext.get();
ctx.getSessionVariable().setEnableFileCache(true);
ctx.getSessionVariable().setDisableFileCache(false);
ctx.getSessionVariable().setEnableFileCacheOlapTable(true);
ctx.getSessionVariable().setEnableFileCacheExternalCatalog(true);
NereidsParser nereidsParser = new NereidsParser();

// Test basic warm up select statement
Expand Down
7 changes: 7 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ struct TQueryOptions {
// deprecated
60: optional i32 partitioned_hash_agg_rows_threshold = 0 // deprecated

// deprecated
61: optional bool enable_file_cache = false

62: optional i32 insert_timeout = 14400
Expand Down Expand Up @@ -424,6 +425,7 @@ struct TQueryOptions {
183: optional bool enable_use_hybrid_sort = false;
184: optional i32 cte_max_recursion_depth;


185: optional bool enable_parquet_file_page_cache = true;

186: optional bool enable_streaming_agg_hash_join_force_passthrough;
Expand All @@ -441,8 +443,13 @@ struct TQueryOptions {
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.

// deprecated
1000: optional bool disable_file_cache = false
1001: optional i32 file_cache_query_limit_percent = -1

1002: optional bool enable_file_cache_olap_table = true;
1003: optional bool enable_file_cache_external_catalog = false;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ suite('test_balance_warm_up_with_compaction_use_peer_cache', 'docker') {
def testCase = { table ->
def ms = cluster.getAllMetaservices().get(0)
def msHttpPort = ms.host + ":" + ms.httpPort
sql """set enable_file_cache=true"""
sql """set enable_file_cache_olap_table=true"""
sql """CREATE TABLE $table (
`id` BIGINT,
`deleted` TINYINT,
Expand Down
18 changes: 8 additions & 10 deletions regression-test/suites/cloud_p0/cache/test_load_cache.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import groovy.json.JsonSlurper
/*
Test Description:

1. When disable_file_cache = true and enable_file_cache = true, it is expected that the S3 TVF load (import phase) will NOT enter the cache, while the query
1. When enable_file_cache_olap_table = false, it is expected that the S3 TVF load (import phase) will NOT enter the cache, while the query
phase will enter the Disposable queue.
Specifically: Normal queue size should be 0, Disposable queue size should be 91163 bytes.
2. When disable_file_cache = false and enable_file_cache = true, it is expected that the S3 TVF load (import phase) will enter the Normal queue, and the query
phase will still enter the Disposable queue.
2. When enable_file_cache_olap_table = true, it is expected that the S3 TVF load (import phase) will enter the Normal queue, and the query
phase will enter the Normal queue.
Specifically: Normal queue size should be 236988 bytes, Disposable queue size should still be 91163 bytes.

Explanation: The query phase caches the compressed file, so the Disposable queue size is checked for an exact value; for the import phase cache, since future
Explanation: The query phase caches the compressed file, so the Disposable queue size is checked for an exact value; for the import phase cache, since future
changes to statistics are possible, only a reasonable range is required.
*/

Expand Down Expand Up @@ -173,15 +173,14 @@ suite('test_load_cache', 'docker') {
def s3_tvf_uri = "s3://${s3_bucket}/regression/tpch/sf0.01/customer.csv.gz"

// ============================================================================
// SCENARIO 1: disable_file_cache = true
// SCENARIO 1: enable_file_cache_olap_table = false
// ============================================================================

// Clear file cache before test
clearFileCacheOnAllBackends()

// Set session variables for Scenario 1
sql "set disable_file_cache = true;"
sql "set enable_file_cache = true;"
sql "set enable_file_cache_olap_table = false;"

// Create test table
sql """DROP TABLE IF EXISTS load_test_table"""
Expand Down Expand Up @@ -264,15 +263,14 @@ suite('test_load_cache', 'docker') {
sleep(3000)

// ============================================================================
// SCENARIO 2: disable_file_cache = false
// SCENARIO 2: enable_file_cache_olap_table = true
// ============================================================================

// Clear file cache before test
clearFileCacheOnAllBackends()

// Set session variables for Scenario 2
sql "set disable_file_cache = false;"
sql "set enable_file_cache = true;"
sql "set enable_file_cache_olap_table = true;"

// Create test table
sql """DROP TABLE IF EXISTS load_test_table"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ suite("test_warmup_select") {

def test_basic_warmup = {
// Enable file cache for warm up functionality
sql "set disable_file_cache=false"
sql "set enable_file_cache_olap_table=true"

sql "WARM UP SELECT * FROM lineitem"

Expand All @@ -72,7 +72,7 @@ suite("test_warmup_select") {

def test_warmup_negative_cases = {
// Enable file cache for warm up functionality
sql "set disable_file_cache=false"
sql "set enable_file_cache_olap_table=true"

// These should fail as warm up select doesn't support these operations
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ suite("test_file_cache_features", "external_docker,hive,external_docker_hive,p0,
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort")

sql """set global enable_file_cache=true"""
sql """set global enable_file_cache_external_catalog=true"""
sql """drop catalog if exists ${catalog_name} """

sql """CREATE CATALOG ${catalog_name} PROPERTIES (
Expand Down Expand Up @@ -236,6 +236,6 @@ suite("test_file_cache_features", "external_docker,hive,external_docker_hive,p0,
}
// ===== End File Cache Features Metrics Check =====

sql """set global enable_file_cache=false"""
sql """set global enable_file_cache_external_catalog=false"""
return true
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ suite("test_file_cache_query_limit", "external_docker,hive,external_docker_hive,
return
}

sql """set enable_file_cache=true"""
sql """set enable_file_cache_external_catalog=true"""

// Check backend configuration prerequisites
// Note: This test case assumes a single backend scenario. Testing with single backend is logically equivalent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ suite("test_file_cache_statistics", "external_docker,hive,external_docker_hive,p
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort")

sql """set global enable_file_cache=true"""
sql """set global enable_file_cache_external_catalog=true"""
sql """drop catalog if exists ${catalog_name} """

sql """CREATE CATALOG ${catalog_name} PROPERTIES (
Expand Down Expand Up @@ -253,7 +253,7 @@ suite("test_file_cache_statistics", "external_docker,hive,external_docker_hive,p
assertTrue(false, TOTAL_READ_COUNTS_DID_NOT_INCREASE_MSG)
}
// ===== End Hit and Read Counts Metrics Check =====
sql """set global enable_file_cache=false"""
sql """set global enable_file_cache_external_catalog=false"""
return true
}

Loading