diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 9e53260e42773..6575b5e4e0364 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -443,10 +443,7 @@ mod tests { use super::*; use datafusion::{ common::test_util::batches_to_string, - execution::cache::{ - DefaultListFilesCache, cache_manager::CacheManagerConfig, - cache_unit::DefaultFileStatisticsCache, - }, + execution::cache::{DefaultListFilesCache, cache_manager::CacheManagerConfig}, prelude::{ParquetReadOptions, col, lit, split_part}, }; use insta::assert_snapshot; @@ -647,9 +644,9 @@ mod tests { +-----------------------------------+-----------------+---------------------+------+------------------+ | filename | file_size_bytes | metadata_size_bytes | hits | extra | +-----------------------------------+-----------------+---------------------+------+------------------+ - | alltypes_plain.parquet | 1851 | 8882 | 5 | page_index=false | + | alltypes_plain.parquet | 1851 | 8882 | 8 | page_index=false | | alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true | - | lz4_raw_compressed_larger.parquet | 380836 | 1347 | 3 | page_index=false | + | lz4_raw_compressed_larger.parquet | 380836 | 1347 | 4 | page_index=false | +-----------------------------------+-----------------+---------------------+------+------------------+ "); @@ -689,55 +686,6 @@ mod tests { // When the cache manager creates a StatisticsCache by default, // the contents will show up here - let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename"; - let df = ctx.sql(sql).await?; - let rbs = df.collect().await?; - assert_snapshot!(batches_to_string(&rbs),@r" - ++ - ++ - "); - - Ok(()) - } - - // Can be removed when https://github.com/apache/datafusion/issues/19217 is resolved - #[tokio::test] - async fn test_statistics_cache_override() -> Result<(), DataFusionError> { - // Install a specific StatisticsCache implementation - let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); - let cache_config = CacheManagerConfig::default() - .with_files_statistics_cache(Some(file_statistics_cache.clone())); - let runtime = RuntimeEnvBuilder::new() - .with_cache_manager(cache_config) - .build()?; - let config = SessionConfig::new().with_collect_statistics(true); - let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime)); - - ctx.register_udtf( - "statistics_cache", - Arc::new(StatisticsCacheFunc::new( - ctx.task_ctx().runtime_env().cache_manager.clone(), - )), - ); - - for filename in [ - "alltypes_plain", - "alltypes_tiny_pages", - "lz4_raw_compressed_larger", - ] { - ctx.sql( - format!( - "create external table {filename} - stored as parquet - location '../parquet-testing/data/{filename}.parquet'", - ) - .as_str(), - ) - .await? - .collect() - .await?; - } - let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename"; let df = ctx.sql(sql).await?; let rbs = df.collect().await?; diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 38456944075fc..010939b74bbba 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -36,7 +36,6 @@ use datafusion_datasource::{ }; use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::FileStatisticsCache; -use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; @@ -188,7 +187,7 @@ pub struct ListingTable { /// The SQL definition for this table, if any definition: Option, /// Cache for collected file statistics - collected_statistics: Arc, + collected_statistics: Option>, /// Constraints applied to this table constraints: Constraints, /// Column default expressions for columns that are not physically present in the data files @@ -232,7 +231,7 @@ impl ListingTable { schema_source, options, definition: None, - collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), + collected_statistics: None, constraints: Constraints::default(), column_defaults: HashMap::new(), expr_adapter_factory: config.expr_adapter_factory, @@ -261,10 +260,8 @@ impl ListingTable { /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics /// multiple times in the same session. /// - /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query. pub fn with_cache(mut self, cache: Option>) -> Self { - self.collected_statistics = - cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default())); + self.collected_statistics = cache; self } @@ -810,7 +807,8 @@ impl ListingTable { let meta = &part_file.object_meta; // Check cache first - if we have valid cached statistics and ordering - if let Some(cached) = self.collected_statistics.get(path) + if let Some(cache) = &self.collected_statistics + && let Some(cached) = cache.get(path) && cached.is_valid_for(meta) { // Return cached statistics and ordering @@ -827,14 +825,16 @@ impl ListingTable { let statistics = Arc::new(file_meta.statistics); // Store in cache - self.collected_statistics.put( - path, - CachedFileMetadata::new( - meta.clone(), - Arc::clone(&statistics), - file_meta.ordering.clone(), - ), - ); + if let Some(cache) = &self.collected_statistics { + cache.put( + path, + CachedFileMetadata::new( + meta.clone(), + Arc::clone(&statistics), + file_meta.ordering.clone(), + ), + ); + } Ok((statistics, file_meta.ordering)) } diff --git a/datafusion/common/src/heap_size.rs b/datafusion/common/src/heap_size.rs new file mode 100644 index 0000000000000..6dee7d5c0a373 --- /dev/null +++ b/datafusion/common/src/heap_size.rs @@ -0,0 +1,458 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::stats::Precision; +use crate::{ColumnStatistics, ScalarValue, Statistics}; +use arrow::array::{ + Array, FixedSizeListArray, LargeListArray, ListArray, MapArray, StructArray, +}; +use arrow::datatypes::{ + DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano, IntervalUnit, + TimeUnit, UnionFields, UnionMode, i256, +}; +use chrono::{DateTime, Utc}; +use half::f16; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +/// This is a temporary solution until and +/// are resolved. +/// Trait for calculating the size of various containers +pub trait DFHeapSize { + /// Return the size of any bytes allocated on the heap by this object, + /// including heap memory in those structures + /// + /// Note that the size of the type itself is not included in the result -- + /// instead, that size is added by the caller (e.g. container). + fn heap_size(&self) -> usize; +} + +impl DFHeapSize for Statistics { + fn heap_size(&self) -> usize { + self.num_rows.heap_size() + + self.total_byte_size.heap_size() + + self + .column_statistics + .iter() + .map(|s| s.heap_size()) + .sum::() + } +} + +impl DFHeapSize + for Precision +{ + fn heap_size(&self) -> usize { + self.get_value().map_or_else(|| 0, |v| v.heap_size()) + } +} + +impl DFHeapSize for ColumnStatistics { + fn heap_size(&self) -> usize { + self.null_count.heap_size() + + self.max_value.heap_size() + + self.min_value.heap_size() + + self.sum_value.heap_size() + + self.distinct_count.heap_size() + + self.byte_size.heap_size() + } +} + +impl DFHeapSize for ScalarValue { + fn heap_size(&self) -> usize { + use crate::scalar::ScalarValue::*; + match self { + Null => 0, + Boolean(b) => b.heap_size(), + Float16(f) => f.heap_size(), + Float32(f) => f.heap_size(), + Float64(f) => f.heap_size(), + Decimal32(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal64(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal128(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal256(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Int8(i) => i.heap_size(), + Int16(i) => i.heap_size(), + Int32(i) => i.heap_size(), + Int64(i) => i.heap_size(), + UInt8(u) => u.heap_size(), + UInt16(u) => u.heap_size(), + UInt32(u) => u.heap_size(), + UInt64(u) => u.heap_size(), + Utf8(u) => u.heap_size(), + Utf8View(u) => u.heap_size(), + LargeUtf8(l) => l.heap_size(), + Binary(b) => b.heap_size(), + BinaryView(b) => b.heap_size(), + FixedSizeBinary(a, b) => a.heap_size() + b.heap_size(), + LargeBinary(l) => l.heap_size(), + FixedSizeList(f) => f.heap_size(), + List(l) => l.heap_size(), + LargeList(l) => l.heap_size(), + Struct(s) => s.heap_size(), + Map(m) => m.heap_size(), + Date32(d) => d.heap_size(), + Date64(d) => d.heap_size(), + Time32Second(t) => t.heap_size(), + Time32Millisecond(t) => t.heap_size(), + Time64Microsecond(t) => t.heap_size(), + Time64Nanosecond(t) => t.heap_size(), + TimestampSecond(a, b) => a.heap_size() + b.heap_size(), + TimestampMillisecond(a, b) => a.heap_size() + b.heap_size(), + TimestampMicrosecond(a, b) => a.heap_size() + b.heap_size(), + TimestampNanosecond(a, b) => a.heap_size() + b.heap_size(), + IntervalYearMonth(i) => i.heap_size(), + IntervalDayTime(i) => i.heap_size(), + IntervalMonthDayNano(i) => i.heap_size(), + DurationSecond(d) => d.heap_size(), + DurationMillisecond(d) => d.heap_size(), + DurationMicrosecond(d) => d.heap_size(), + DurationNanosecond(d) => d.heap_size(), + Union(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Dictionary(a, b) => a.heap_size() + b.heap_size(), + } + } +} + +impl DFHeapSize for DataType { + fn heap_size(&self) -> usize { + use DataType::*; + match self { + Null => 0, + Boolean => 0, + Int8 => 0, + Int16 => 0, + Int32 => 0, + Int64 => 0, + UInt8 => 0, + UInt16 => 0, + UInt32 => 0, + UInt64 => 0, + Float16 => 0, + Float32 => 0, + Float64 => 0, + Timestamp(t, s) => t.heap_size() + s.heap_size(), + Date32 => 0, + Date64 => 0, + Time32(t) => t.heap_size(), + Time64(t) => t.heap_size(), + Duration(t) => t.heap_size(), + Interval(i) => i.heap_size(), + Binary => 0, + FixedSizeBinary(i) => i.heap_size(), + LargeBinary => 0, + BinaryView => 0, + Utf8 => 0, + LargeUtf8 => 0, + Utf8View => 0, + List(v) => v.heap_size(), + ListView(v) => v.heap_size(), + FixedSizeList(f, i) => f.heap_size() + i.heap_size(), + LargeList(l) => l.heap_size(), + LargeListView(l) => l.heap_size(), + Struct(s) => s.heap_size(), + Union(u, m) => u.heap_size() + m.heap_size(), + Dictionary(a, b) => a.heap_size() + b.heap_size(), + Decimal32(u8, i8) => u8.heap_size() + i8.heap_size(), + Decimal64(u8, i8) => u8.heap_size() + i8.heap_size(), + Decimal128(u8, i8) => u8.heap_size() + i8.heap_size(), + Decimal256(u8, i8) => u8.heap_size() + i8.heap_size(), + Map(m, b) => m.heap_size() + b.heap_size(), + RunEndEncoded(a, b) => a.heap_size() + b.heap_size(), + } + } +} + +impl DFHeapSize for Vec { + fn heap_size(&self) -> usize { + let item_size = size_of::(); + // account for the contents of the Vec + (self.capacity() * item_size) + + // add any heap allocations by contents + self.iter().map(|t| t.heap_size()).sum::() + } +} + +impl DFHeapSize for HashMap { + fn heap_size(&self) -> usize { + let capacity = self.capacity(); + if capacity == 0 { + return 0; + } + + // HashMap doesn't provide a way to get its heap size, so this is an approximation based on + // the behavior of hashbrown::HashMap as at version 0.16.0, and may become inaccurate + // if the implementation changes. + let key_val_size = size_of::<(K, V)>(); + // Overhead for the control tags group, which may be smaller depending on architecture + let group_size = 16; + // 1 byte of metadata stored per bucket. + let metadata_size = 1; + + // Compute the number of buckets for the capacity. Based on hashbrown's capacity_to_buckets + let buckets = if capacity < 15 { + let min_cap = match key_val_size { + 0..=1 => 14, + 2..=3 => 7, + _ => 3, + }; + let cap = min_cap.max(capacity); + if cap < 4 { + 4 + } else if cap < 8 { + 8 + } else { + 16 + } + } else { + (capacity.saturating_mul(8) / 7).next_power_of_two() + }; + + group_size + + (buckets * (key_val_size + metadata_size)) + + self.keys().map(|k| k.heap_size()).sum::() + + self.values().map(|v| v.heap_size()).sum::() + } +} + +impl DFHeapSize for Arc { + fn heap_size(&self) -> usize { + // Arc stores weak and strong counts on the heap alongside an instance of T + 2 * size_of::() + size_of::() + self.as_ref().heap_size() + } +} + +impl DFHeapSize for Arc { + fn heap_size(&self) -> usize { + 2 * size_of::() + size_of_val(self.as_ref()) + self.as_ref().heap_size() + } +} + +impl DFHeapSize for Fields { + fn heap_size(&self) -> usize { + self.into_iter().map(|f| f.heap_size()).sum::() + } +} + +impl DFHeapSize for StructArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl DFHeapSize for LargeListArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl DFHeapSize for ListArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl DFHeapSize for FixedSizeListArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} +impl DFHeapSize for MapArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl DFHeapSize for Arc { + fn heap_size(&self) -> usize { + 2 * size_of::() + self.as_ref().heap_size() + } +} + +impl DFHeapSize for Box { + fn heap_size(&self) -> usize { + size_of::() + self.as_ref().heap_size() + } +} + +impl DFHeapSize for Option { + fn heap_size(&self) -> usize { + self.as_ref().map(|inner| inner.heap_size()).unwrap_or(0) + } +} + +impl DFHeapSize for (A, B) +where + A: DFHeapSize, + B: DFHeapSize, +{ + fn heap_size(&self) -> usize { + self.0.heap_size() + self.1.heap_size() + } +} + +impl DFHeapSize for String { + fn heap_size(&self) -> usize { + self.capacity() + } +} + +impl DFHeapSize for str { + fn heap_size(&self) -> usize { + self.to_string().capacity() + } +} + +impl DFHeapSize for UnionFields { + fn heap_size(&self) -> usize { + self.iter().map(|f| f.0.heap_size() + f.1.heap_size()).sum() + } +} + +impl DFHeapSize for UnionMode { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for TimeUnit { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for IntervalUnit { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for Field { + fn heap_size(&self) -> usize { + self.name().heap_size() + + self.data_type().heap_size() + + self.is_nullable().heap_size() + + self.dict_is_ordered().heap_size() + + self.metadata().heap_size() + } +} + +impl DFHeapSize for IntervalMonthDayNano { + fn heap_size(&self) -> usize { + self.days.heap_size() + self.months.heap_size() + self.nanoseconds.heap_size() + } +} + +impl DFHeapSize for IntervalDayTime { + fn heap_size(&self) -> usize { + self.days.heap_size() + self.milliseconds.heap_size() + } +} + +impl DFHeapSize for DateTime { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for bool { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} +impl DFHeapSize for u8 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for u16 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for u32 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for u64 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for i8 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for i16 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for i32 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} +impl DFHeapSize for i64 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for i128 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for i256 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for f16 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for f32 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} +impl DFHeapSize for f64 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for usize { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index fdd04f752455e..d093421f7dcb1 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -46,6 +46,7 @@ pub mod error; pub mod file_options; pub mod format; pub mod hash_utils; +pub mod heap_size; pub mod instant; pub mod metadata; pub mod nested_struct; @@ -61,6 +62,7 @@ pub mod test_util; pub mod tree_node; pub mod types; pub mod utils; + /// Reexport arrow crate pub use arrow; pub use column::Column; diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 98f61a8528aa0..6a2fd5228d9d7 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -488,7 +488,7 @@ mod tests { // Test with collect_statistics enabled let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); let cache_config = CacheManagerConfig::default() - .with_files_statistics_cache(Some(file_statistics_cache.clone())); + .with_file_statistics_cache(Some(file_statistics_cache.clone())); let runtime = RuntimeEnvBuilder::new() .with_cache_manager(cache_config) .build_arc() @@ -518,7 +518,7 @@ mod tests { // Test with collect_statistics disabled let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); let cache_config = CacheManagerConfig::default() - .with_files_statistics_cache(Some(file_statistics_cache.clone())); + .with_file_statistics_cache(Some(file_statistics_cache.clone())); let runtime = RuntimeEnvBuilder::new() .with_cache_manager(cache_config) .build_arc() diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b6c606ff467f9..36158f0c6ca56 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -101,6 +101,7 @@ use datafusion_session::SessionStore; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use datafusion_execution::cache::cache_unit::DEFAULT_FILE_STATISTICS_MEMORY_LIMIT; use object_store::ObjectStore; use parking_lot::RwLock; use url::Url; @@ -1187,6 +1188,10 @@ impl SessionContext { let duration = Self::parse_duration(value)?; builder.with_object_list_cache_ttl(Some(duration)) } + "file_statistics_cache_limit" => { + let limit = Self::parse_memory_limit(value)?; + builder.with_file_statistics_cache_limit(limit) + } _ => return plan_err!("Unknown runtime configuration: {variable}"), // Remember to update `reset_runtime_variable()` when adding new options }; @@ -1226,9 +1231,13 @@ impl SessionContext { builder = builder.with_object_list_cache_ttl(DEFAULT_LIST_FILES_CACHE_TTL); } + "file_statistics_cache_limit" => { + builder = builder.with_file_statistics_cache_limit( + DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + ); + } _ => return plan_err!("Unknown runtime configuration: {variable}"), }; - *state = SessionStateBuilder::from(state.clone()) .with_runtime_env(Arc::new(builder.build()?)) .build(); diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index fdefdafa00aa4..01f1c3ae848b2 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -276,7 +276,7 @@ fn get_cache_runtime_state() -> ( let list_file_cache = Arc::new(DefaultListFilesCache::default()); let cache_config = cache_config - .with_files_statistics_cache(Some(file_static_cache.clone())) + .with_file_statistics_cache(Some(file_static_cache.clone())) .with_list_files_cache(Some(list_file_cache.clone())); let rt = RuntimeEnvBuilder::new() diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index d85892c254570..099d159fc034d 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -25,6 +25,7 @@ use datafusion::execution::context::TaskContext; use datafusion::prelude::SessionConfig; use datafusion_execution::cache::DefaultListFilesCache; use datafusion_execution::cache::cache_manager::CacheManagerConfig; +use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_plan::common::collect; @@ -325,6 +326,51 @@ async fn test_list_files_cache_ttl() { assert_eq!(get_limit(&ctx), Duration::from_secs(90)); } +#[tokio::test] +async fn test_file_statistics_cache_limit() { + let list_files_cache = Arc::new(DefaultFileStatisticsCache::default()); + + let rt = RuntimeEnvBuilder::new() + .with_cache_manager( + CacheManagerConfig::default() + .with_file_statistics_cache(Some(list_files_cache)), + ) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + + let update_limit = async |ctx: &SessionContext, limit: &str| { + ctx.sql( + format!("SET datafusion.runtime.file_statistics_cache_limit = '{limit}'") + .as_str(), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + }; + + let get_limit = |ctx: &SessionContext| -> usize { + ctx.task_ctx() + .runtime_env() + .cache_manager + .get_file_statistic_cache() + .unwrap() + .cache_limit() + }; + + update_limit(&ctx, "1M").await; + assert_eq!(get_limit(&ctx), 1024 * 1024); + + update_limit(&ctx, "42G").await; + assert_eq!(get_limit(&ctx), 42 * 1024 * 1024 * 1024); + + update_limit(&ctx, "23K").await; + assert_eq!(get_limit(&ctx), 23 * 1024); +} + #[tokio::test] async fn test_unknown_runtime_config() { let ctx = SessionContext::new(); diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index bd34c441bdbde..5e0b3e08cb6b4 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -17,10 +17,14 @@ use crate::cache::CacheAccessor; use crate::cache::DefaultListFilesCache; -use crate::cache::cache_unit::DefaultFilesMetadataCache; +use crate::cache::cache_unit::{ + DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, DefaultFileStatisticsCache, + DefaultFilesMetadataCache, +}; use crate::cache::list_files_cache::ListFilesEntry; use crate::cache::list_files_cache::TableScopedPath; use datafusion_common::TableReference; +use datafusion_common::heap_size::DFHeapSize; use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -41,7 +45,7 @@ pub use super::list_files_cache::{ /// /// This struct embeds the [`ObjectMeta`] used for cache validation, /// along with the cached statistics and ordering information. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct CachedFileMetadata { /// File metadata used for cache validation (size, last_modified). pub meta: ObjectMeta, @@ -81,7 +85,7 @@ impl CachedFileMetadata { /// - Statistics for the file /// - Ordering information for the file /// -/// If enabled via [`CacheManagerConfig::with_files_statistics_cache`] this +/// If enabled via [`CacheManagerConfig::with_file_statistics_cache`] this /// cache avoids inferring the same file statistics repeatedly during the /// session lifetime. /// @@ -92,10 +96,26 @@ impl CachedFileMetadata { /// /// See [`crate::runtime_env::RuntimeEnv`] for more details pub trait FileStatisticsCache: CacheAccessor { + fn cache_limit(&self) -> usize; + + /// Updates the cache with a new memory limit in bytes. + fn update_cache_limit(&self, limit: usize); + /// Retrieves the information about the entries currently cached. fn list_entries(&self) -> HashMap; } +impl DFHeapSize for CachedFileMetadata { + fn heap_size(&self) -> usize { + self.meta.size.heap_size() + + self.meta.last_modified.heap_size() + + self.meta.version.heap_size() + + self.meta.e_tag.heap_size() + + self.meta.location.as_ref().heap_size() + + self.statistics.heap_size() + } +} + /// Represents information about a cached statistics entry. /// This is used to expose the statistics cache contents to outside modules. #[derive(Debug, Clone, PartialEq, Eq)] @@ -330,8 +350,19 @@ pub struct CacheManager { impl CacheManager { pub fn try_new(config: &CacheManagerConfig) -> Result> { - let file_statistic_cache = - config.table_files_statistics_cache.as_ref().map(Arc::clone); + let file_statistic_cache = match &config.file_statistics_cache { + Some(fsc) if config.file_statistics_cache_limit > 0 => { + fsc.update_cache_limit(config.file_statistics_cache_limit); + Some(Arc::clone(fsc)) + } + None if config.file_statistics_cache_limit > 0 => { + let fsc: Arc = Arc::new( + DefaultFileStatisticsCache::new(config.file_statistics_cache_limit), + ); + Some(fsc) + } + _ => None, + }; let list_files_cache = match &config.list_files_cache { Some(lfc) if config.list_files_cache_limit > 0 => { @@ -371,11 +402,18 @@ impl CacheManager { })) } - /// Get the cache of listing files statistics. + /// Get the file statistics cache. pub fn get_file_statistic_cache(&self) -> Option> { self.file_statistic_cache.clone() } + /// Get the memory limit of the file statistics cache. + pub fn get_file_statistic_cache_limit(&self) -> usize { + self.file_statistic_cache + .as_ref() + .map_or(0, |c| c.cache_limit()) + } + /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path. pub fn get_list_files_cache(&self) -> Option> { self.list_files_cache.clone() @@ -411,7 +449,9 @@ pub struct CacheManagerConfig { /// Enable caching of file statistics when listing files. /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session. /// Default is disabled. Currently only Parquet files are supported. - pub table_files_statistics_cache: Option>, + pub file_statistics_cache: Option>, + /// Limit of the file statistics cache, in bytes. Default: 1MiB. + pub file_statistics_cache_limit: usize, /// Enable caching of file metadata when listing files. /// Enabling the cache avoids repeat list and object metadata fetch operations, which may be /// expensive in certain situations (e.g. remote object storage), for objects under paths that @@ -437,7 +477,8 @@ pub struct CacheManagerConfig { impl Default for CacheManagerConfig { fn default() -> Self { Self { - table_files_statistics_cache: Default::default(), + file_statistics_cache: Default::default(), + file_statistics_cache_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, list_files_cache: Default::default(), list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL, @@ -448,14 +489,19 @@ impl Default for CacheManagerConfig { } impl CacheManagerConfig { - /// Set the cache for files statistics. + /// Set the cache for file statistics. /// /// Default is `None` (disabled). - pub fn with_files_statistics_cache( + pub fn with_file_statistics_cache( mut self, cache: Option>, ) -> Self { - self.table_files_statistics_cache = cache; + self.file_statistics_cache = cache; + self + } + + pub fn with_file_statistics_cache_limit(mut self, limit: usize) -> Self { + self.file_statistics_cache_limit = limit; self } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index d98d23821ec7f..847ba036f5cb2 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -15,17 +15,17 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; - use crate::cache::CacheAccessor; use crate::cache::cache_manager::{ CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, }; - -use dashmap::DashMap; -use object_store::path::Path; +use std::collections::HashMap; +use std::sync::Mutex; pub use crate::cache::DefaultFilesMetadataCache; +use crate::cache::lru_queue::LruQueue; +use datafusion_common::heap_size::DFHeapSize; +use object_store::path::Path; /// Default implementation of [`FileStatisticsCache`] /// @@ -41,32 +41,137 @@ pub use crate::cache::DefaultFilesMetadataCache; /// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache #[derive(Default)] pub struct DefaultFileStatisticsCache { - cache: DashMap, + state: Mutex, +} + +impl DefaultFileStatisticsCache { + pub fn new(memory_limit: usize) -> Self { + Self { + state: Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)), + } + } + + /// Returns the size of the cached memory, in bytes. + pub fn memory_used(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_used + } +} + +pub struct DefaultFileStatisticsCacheState { + lru_queue: LruQueue, + memory_limit: usize, + memory_used: usize, +} + +pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB + +impl Default for DefaultFileStatisticsCacheState { + fn default() -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + memory_used: 0, + } + } } +impl DefaultFileStatisticsCacheState { + fn new(memory_limit: usize) -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit, + memory_used: 0, + } + } + fn get(&mut self, key: &Path) -> Option { + self.lru_queue.get(key).cloned() + } + + fn put( + &mut self, + key: &Path, + value: CachedFileMetadata, + ) -> Option { + let entry_size = value.heap_size(); + + if entry_size > self.memory_limit { + return None; + } + + let old_value = self.lru_queue.put(key.clone(), value); + self.memory_used += entry_size; + + if let Some(old_entry) = &old_value { + self.memory_used -= old_entry.heap_size(); + } + + self.evict_entries(); + + old_value + } + + fn remove(&mut self, k: &Path) -> Option { + self.lru_queue.remove(k) + } + + fn contains_key(&self, k: &Path) -> bool { + self.lru_queue.contains_key(k) + } + + fn len(&self) -> usize { + self.lru_queue.len() + } + + fn clear(&mut self) { + self.lru_queue.clear(); + self.memory_used = 0; + } + + fn evict_entries(&mut self) { + while self.memory_used > self.memory_limit { + if let Some(removed) = self.lru_queue.pop() { + self.memory_used -= removed.1.heap_size(); + } else { + // cache is empty while memory_used > memory_limit, cannot happen + debug_assert!( + false, + "cache is empty while memory_used > memory_limit, cannot happen" + ); + return; + } + } + } +} impl CacheAccessor for DefaultFileStatisticsCache { fn get(&self, key: &Path) -> Option { - self.cache.get(key).map(|entry| entry.value().clone()) + let mut state = self.state.lock().unwrap(); + state.get(key) } fn put(&self, key: &Path, value: CachedFileMetadata) -> Option { - self.cache.insert(key.clone(), value) + let mut state = self.state.lock().unwrap(); + state.put(key, value) } - fn remove(&self, k: &Path) -> Option { - self.cache.remove(k).map(|(_, entry)| entry) + fn remove(&self, key: &Path) -> Option { + let mut state = self.state.lock().unwrap(); + state.remove(key) } fn contains_key(&self, k: &Path) -> bool { - self.cache.contains_key(k) + let state = self.state.lock().unwrap(); + state.contains_key(k) } fn len(&self) -> usize { - self.cache.len() + let state = self.state.lock().unwrap(); + state.len() } fn clear(&self) { - self.cache.clear(); + let mut state = self.state.lock().unwrap(); + state.clear(); } fn name(&self) -> String { @@ -75,12 +180,22 @@ impl CacheAccessor for DefaultFileStatisticsCache { } impl FileStatisticsCache for DefaultFileStatisticsCache { + fn cache_limit(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_limit + } + + fn update_cache_limit(&self, limit: usize) { + let mut state = self.state.lock().unwrap(); + state.memory_limit = limit; + state.evict_entries(); + } + fn list_entries(&self) -> HashMap { let mut entries = HashMap::::new(); - - for entry in self.cache.iter() { - let path = entry.key(); - let cached = entry.value(); + for entry in self.state.lock().unwrap().lru_queue.list_entries() { + let path = entry.0.clone(); + let cached = entry.1.clone(); entries.insert( path.clone(), FileStatisticsCacheEntry { @@ -88,7 +203,7 @@ impl FileStatisticsCache for DefaultFileStatisticsCache { num_rows: cached.statistics.num_rows, num_columns: cached.statistics.column_statistics.len(), table_size_bytes: cached.statistics.total_byte_size, - statistics_size_bytes: 0, // TODO: set to the real size in the future + statistics_size_bytes: cached.statistics.heap_size(), has_ordering: cached.ordering.is_some(), }, ); @@ -105,11 +220,12 @@ mod tests { use crate::cache::cache_manager::{ CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, }; - use arrow::array::RecordBatch; + use arrow::array::{Int32Array, ListArray, RecordBatch}; + use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; - use datafusion_common::Statistics; use datafusion_common::stats::Precision; + use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; @@ -395,7 +511,7 @@ mod tests { num_rows: Precision::Absent, num_columns: 1, table_size_bytes: Precision::Absent, - statistics_size_bytes: 0, + statistics_size_bytes: 72, has_ordering: false, } ), @@ -406,11 +522,100 @@ mod tests { num_rows: Precision::Absent, num_columns: 1, table_size_bytes: Precision::Absent, - statistics_size_bytes: 0, + statistics_size_bytes: 72, has_ordering: true, } ), ]) ); } + + #[test] + fn test_cache_entry_added_when_entries_are_within_cache_limit() { + let (meta_1, value_1) = create_cached_file_metadata_with_stats("test1.parquet"); + let (meta_2, value_2) = create_cached_file_metadata_with_stats("test2.parquet"); + let (meta_3, value_3) = create_cached_file_metadata_with_stats("test3.parquet"); + + let limit_for_2_entries = value_1.heap_size() + value_2.heap_size(); + + // create a cache with a limit which fits exactly 2 entries + let cache = DefaultFileStatisticsCache::new(limit_for_2_entries); + + cache.put(&meta_1.location, value_1.clone()); + cache.put(&meta_2.location, value_2.clone()); + + assert_eq!(cache.len(), 2); + assert_eq!(cache.memory_used(), limit_for_2_entries); + + let result_1 = cache.get(&meta_1.location); + let result_2 = cache.get(&meta_2.location); + assert_eq!(result_1.unwrap(), value_1); + assert_eq!(result_2.unwrap(), value_2); + + // adding the third entry evicts the first entry + cache.put(&meta_3.location, value_3.clone()); + assert_eq!(cache.len(), 2); + assert_eq!(cache.memory_used(), limit_for_2_entries); + + let result_1 = cache.get(&meta_1.location); + assert!(result_1.is_none()); + + let result_2 = cache.get(&meta_2.location); + let result_3 = cache.get(&meta_3.location); + + assert_eq!(result_2.unwrap(), value_2); + assert_eq!(result_3.unwrap(), value_3); + + cache.remove(&meta_2.location); + + assert_eq!(cache.len(), 1); + + cache.clear(); + assert_eq!(cache.len(), 0); + } + + #[test] + fn test_cache_rejects_entry_which_is_too_large() { + let (meta, value) = create_cached_file_metadata_with_stats("test1.parquet"); + + let limit_less_than_the_entry = value.heap_size() - 1; + + // create a cache with a size less than the entry + let cache = DefaultFileStatisticsCache::new(limit_less_than_the_entry); + + cache.put(&meta.location, value); + + assert_eq!(cache.len(), 0); + assert_eq!(cache.memory_used(), 0); + } + + fn create_cached_file_metadata_with_stats( + file_name: &str, + ) -> (ObjectMeta, CachedFileMetadata) { + let series: Vec = (0..=10).step_by(1).collect(); + let values = Int32Array::from(series); + let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0])); + let field = Arc::new(Field::new_list_field(DataType::Int32, false)); + let list_array = ListArray::new(field, offsets, Arc::new(values), None); + + let column_statistics = ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))), + min_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))), + sum_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))), + distinct_count: Precision::Exact(10), + byte_size: Precision::Absent, + }; + + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(100), + column_statistics: vec![column_statistics.clone()], + }; + + let object_meta = create_test_meta(file_name, stats.heap_size() as u64); + let value = + CachedFileMetadata::new(object_meta.clone(), Arc::new(stats.clone()), None); + (object_meta, value) + } } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 67604c424c766..e393a7a127873 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -103,6 +103,7 @@ fn create_runtime_config_entries( metadata_cache_limit: Option, list_files_cache_limit: Option, list_files_cache_ttl: Option, + file_statistics_cache_limit: Option, ) -> Vec { vec![ ConfigEntry { @@ -135,6 +136,11 @@ fn create_runtime_config_entries( value: list_files_cache_ttl, description: "TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.", }, + ConfigEntry { + key: "datafusion.runtime.file_statistics_cache_limit".to_string(), + value: file_statistics_cache_limit, + description: "Maximum memory to use for file statistics cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", + }, ] } @@ -296,6 +302,14 @@ impl RuntimeEnv { .get_list_files_cache_ttl() .map(format_duration); + let file_statistics_cache_limit = + self.cache_manager.get_file_statistic_cache_limit(); + let file_statistics_cache_value = format_byte_size( + file_statistics_cache_limit + .try_into() + .expect("File statistics cache size conversion failed"), + ); + create_runtime_config_entries( memory_limit_value, Some(max_temp_dir_value), @@ -303,6 +317,7 @@ impl RuntimeEnv { Some(metadata_cache_value), Some(list_files_cache_value), list_files_cache_ttl, + Some(file_statistics_cache_value), ) } } @@ -438,6 +453,11 @@ impl RuntimeEnvBuilder { self } + pub fn with_file_statistics_cache_limit(mut self, limit: usize) -> Self { + self.cache_manager = self.cache_manager.with_file_statistics_cache_limit(limit); + self + } + /// Build a RuntimeEnv pub fn build(self) -> Result { let Self { @@ -475,9 +495,10 @@ impl RuntimeEnvBuilder { /// Create a new RuntimeEnvBuilder from an existing RuntimeEnv pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self { let cache_config = CacheManagerConfig { - table_files_statistics_cache: runtime_env + file_statistics_cache: runtime_env.cache_manager.get_file_statistic_cache(), + file_statistics_cache_limit: runtime_env .cache_manager - .get_file_statistic_cache(), + .get_file_statistic_cache_limit(), list_files_cache: runtime_env.cache_manager.get_list_files_cache(), list_files_cache_limit: runtime_env .cache_manager @@ -514,6 +535,7 @@ impl RuntimeEnvBuilder { Some("50M".to_owned()), Some("1M".to_owned()), None, + Some("1M".to_owned()), ) } diff --git a/datafusion/sqllogictest/test_files/encrypted_parquet.slt b/datafusion/sqllogictest/test_files/encrypted_parquet.slt index d580b7d1ad2b8..fd375778b7a53 100644 --- a/datafusion/sqllogictest/test_files/encrypted_parquet.slt +++ b/datafusion/sqllogictest/test_files/encrypted_parquet.slt @@ -77,6 +77,10 @@ ORDER BY double_field 3 4 5 6 +# Disable file statistics cache because file statistics have been previously created +statement ok +set datafusion.runtime.file_statistics_cache_limit = "0K"; + statement count 0 CREATE EXTERNAL TABLE parquet_table ( diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e31cdbe0aad23..ba8e551cb8b74 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -323,6 +323,7 @@ datafusion.optimizer.repartition_windows true datafusion.optimizer.skip_failed_rules false datafusion.optimizer.subset_repartition_threshold 4 datafusion.optimizer.top_down_join_key_reordering true +datafusion.runtime.file_statistics_cache_limit 1M datafusion.runtime.list_files_cache_limit 1M datafusion.runtime.list_files_cache_ttl NULL datafusion.runtime.max_temp_directory_size 100G @@ -460,6 +461,7 @@ datafusion.optimizer.repartition_windows true Should DataFusion repartition data datafusion.optimizer.skip_failed_rules false When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail datafusion.optimizer.subset_repartition_threshold 4 Partition count threshold for subset satisfaction optimization. When the current partition count is >= this threshold, DataFusion will skip repartitioning if the required partitioning expression is a subset of the current partition expression such as Hash(a) satisfies Hash(a, b). When the current partition count is < this threshold, DataFusion will repartition to increase parallelism even when subset satisfaction applies. Set to 0 to always repartition (disable subset satisfaction optimization). Set to a high value to always use subset satisfaction. Example (subset_repartition_threshold = 4): ```text Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a]) If current partitions (3) < threshold (4), repartition: AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)] RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3 AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3) If current partitions (8) >= threshold (4), use subset satisfaction: AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8) ``` datafusion.optimizer.top_down_join_key_reordering true When set to true, the physical plan optimizer will run a top down process to reorder the join keys +datafusion.runtime.file_statistics_cache_limit 1M Maximum memory to use for file statistics cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. datafusion.runtime.list_files_cache_limit 1M Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. datafusion.runtime.list_files_cache_ttl NULL TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. datafusion.runtime.max_temp_directory_size 100G Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. diff --git a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt index 5a559bdb94835..0587d1faa17f8 100644 --- a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt @@ -177,6 +177,10 @@ physical_plan statement ok DROP TABLE test_table; +# Disable file statistics cache because file statistics have been previously created +statement ok +set datafusion.runtime.file_statistics_cache_limit = "0K"; + statement ok CREATE EXTERNAL TABLE test_table ( partition_col TEXT NOT NULL, diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index c444128b18f4f..a82ab1d32bae8 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -351,6 +351,12 @@ RESET datafusion.runtime.memory_limit statement ok EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1 +statement ok +SET datafusion.runtime.file_statistics_cache_limit = '1K' + +statement ok +RESET datafusion.runtime.file_statistics_cache_limit + statement ok SET datafusion.runtime.list_files_cache_limit = '1K' @@ -407,6 +413,15 @@ SHOW datafusion.runtime.max_temp_directory_size ---- datafusion.runtime.max_temp_directory_size 10G +# Test SET and SHOW rruntime.file_statistics_cache_limit +statement ok +SET datafusion.runtime.file_statistics_cache_limit = '42M' + +query TT +SHOW datafusion.runtime.file_statistics_cache_limit +---- +datafusion.runtime.file_statistics_cache_limit 42M + # Test SET and SHOW runtime.metadata_cache_limit statement ok SET datafusion.runtime.metadata_cache_limit = '200M' @@ -441,6 +456,7 @@ datafusion.runtime.list_files_cache_ttl 1m30s query T SELECT name FROM information_schema.df_settings WHERE name LIKE 'datafusion.runtime.%' ORDER BY name ---- +datafusion.runtime.file_statistics_cache_limit datafusion.runtime.list_files_cache_limit datafusion.runtime.list_files_cache_ttl datafusion.runtime.max_temp_directory_size diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index aaba453b3541f..d0c439c7d413f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -204,14 +204,15 @@ SET datafusion.runtime.memory_limit = '2G'; The following runtime configuration settings are available: -| key | default | description | -| ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.runtime.list_files_cache_limit | 1M | Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.list_files_cache_ttl | NULL | TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. | -| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.metadata_cache_limit | 50M | Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. | +| key | default | description | +| ---------------------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.runtime.file_statistics_cache_limit | 1M | Maximum memory to use for file statistics cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.list_files_cache_limit | 1M | Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.list_files_cache_ttl | NULL | TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. | +| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.metadata_cache_limit | 50M | Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. | # Tuning Guide